1 /+++++++++++++++++++++++++++++ 2 + This module defines the concept of Observable. 3 +/ 4 module rx.observable; 5 6 import std.functional : unaryFun; 7 import std.range : put, isInputRange, isOutputRange, ElementType; 8 9 import rx.disposable; 10 import rx.observer; 11 import rx.util; 12 13 ///Tests if something is a Observable. 14 template isObservable(T, E) 15 { 16 enum bool isObservable = is(T.ElementType : E) && is(typeof({ 17 T observable = void; 18 Observer!E observer = void; 19 auto d = observable.subscribe(observer); 20 static assert(isDisposable!(typeof(d))); 21 }())); 22 } 23 /// 24 unittest 25 { 26 struct TestObservable 27 { 28 alias ElementType = int; 29 30 Disposable subscribe(T)(T observer) 31 { 32 static assert(isObserver!(T, int)); 33 return null; 34 } 35 } 36 37 static assert(isObservable!(TestObservable, int)); 38 static assert(!isObservable!(TestObservable, Object)); 39 } 40 41 ///Test if the observer can subscribe to the observable. 42 template isSubscribable(TObservable, TObserver) 43 { 44 enum bool isSubscribable = is(typeof({ 45 TObservable observable = void; 46 TObserver observer = void; 47 auto d = observable.subscribe(observer); 48 static assert(isDisposable!(typeof(d))); 49 }())); 50 } 51 /// 52 unittest 53 { 54 struct TestDisposable 55 { 56 void dispose() 57 { 58 } 59 } 60 61 struct TestObserver 62 { 63 void put(int n) 64 { 65 } 66 67 void completed() 68 { 69 } 70 71 void failure(Exception e) 72 { 73 } 74 } 75 76 struct TestObservable 77 { 78 TestDisposable subscribe(TestObserver observer) 79 { 80 return TestDisposable(); 81 } 82 } 83 84 static assert(isSubscribable!(TestObservable, TestObserver)); 85 } 86 87 ///The helper for subscribe easier. 88 auto doSubscribe(TObservable, E)(auto ref TObservable observable, void delegate(E) doPut, 89 void delegate() doCompleted, void delegate(Exception) doFailure) 90 { 91 return doSubscribe(observable, makeObserver(doPut, doCompleted, doFailure)); 92 } 93 ///ditto 94 auto doSubscribe(TObservable, E)(auto ref TObservable observable, 95 void delegate(E) doPut, void delegate() doCompleted) 96 { 97 return doSubscribe(observable, makeObserver(doPut, doCompleted)); 98 } 99 ///ditto 100 auto doSubscribe(TObservable, E)(auto ref TObservable observable, 101 void delegate(E) doPut, void delegate(Exception) doFailure) 102 { 103 return doSubscribe(observable, makeObserver(doPut, doFailure)); 104 } 105 ///ditto 106 auto doSubscribe(alias f, TObservable)(auto ref TObservable observable) 107 { 108 alias fun = unaryFun!f; 109 return doSubscribe(observable, (TObservable.ElementType obj) { fun(obj); }); 110 } 111 ///ditto 112 auto doSubscribe(TObservable, TObserver)(auto ref TObservable observable, auto ref TObserver observer) 113 { 114 import std.format : format; 115 116 static assert(isObservable!(TObservable, TObservable.ElementType), 117 format!"%s is invalid as an Observable"(TObservable.stringof)); 118 119 alias ElementType = TObservable.ElementType; 120 static if (isSubscribable!(TObservable, TObserver)) 121 return observable.subscribe(observer); 122 else static if (isSubscribable!(TObservable, Observer!ElementType)) 123 return observable.subscribe(observerObject!ElementType(observer)); 124 else 125 { 126 static assert(false, format!"%s can not subscribe '%s', it published by %s"( 127 TObserver.stringof, ElementType.stringof, TObservable.stringof)); 128 } 129 } 130 /// 131 unittest 132 { 133 struct TestObservable 134 { 135 alias ElementType = int; 136 137 auto subscribe(TObserver)(TObserver observer) 138 { 139 .put(observer, [0, 1, 2]); 140 return NopDisposable.instance; 141 } 142 } 143 144 TestObservable observable; 145 int[] result; 146 observable.doSubscribe!(n => result ~= n); 147 assert(result.length == 3); 148 } 149 150 /// 151 unittest 152 { 153 struct TestObserver 154 { 155 void put(int n) 156 { 157 } 158 } 159 160 struct TestObservable1 161 { 162 alias ElementType = int; 163 Disposable subscribe(Observer!int observer) 164 { 165 return null; 166 } 167 } 168 169 struct TestObservable2 170 { 171 alias ElementType = int; 172 Disposable subscribe(T)(T observer) 173 { 174 return null; 175 } 176 } 177 178 TestObservable1 o1; 179 auto d0 = o1.doSubscribe((int n) { }, () { }, (Exception e) { }); 180 auto d1 = o1.doSubscribe((int n) { }, () { }); 181 auto d2 = o1.doSubscribe((int n) { }, (Exception e) { }); 182 auto d3 = o1.doSubscribe((int n) { }); 183 auto d4 = o1.doSubscribe(TestObserver()); 184 TestObservable2 o2; 185 auto d5 = o2.doSubscribe((int n) { }, () { }, (Exception e) { }); 186 auto d6 = o2.doSubscribe((int n) { }, () { }); 187 auto d7 = o2.doSubscribe((int n) { }, (Exception e) { }); 188 auto d8 = o2.doSubscribe((int n) { }); 189 auto d9 = o2.doSubscribe(TestObserver()); 190 } 191 192 ///Wrapper for Observable objects. 193 interface Observable(E) 194 { 195 alias ElementType = E; 196 197 Disposable subscribe(Observer!E observer); 198 } 199 200 unittest 201 { 202 static assert(isObservable!(Observable!int, int)); 203 } 204 205 unittest 206 { 207 static struct TestNoObservable 208 { 209 Disposable subscribe(Observer!int observer) 210 { 211 return null; 212 } 213 } 214 215 static assert(!isObservable!(TestNoObservable, int)); 216 } 217 218 ///Class that implements Observable interface and wraps the subscribe method in virtual function. 219 class ObservableObject(R, E) : Observable!E 220 { 221 public: 222 this(R observable) 223 { 224 _observable = observable; 225 } 226 227 public: 228 Disposable subscribe(Observer!E observer) 229 { 230 return disposableObject(_observable.subscribe(observer)); 231 } 232 233 private: 234 R _observable; 235 } 236 237 ///Wraps subscribe method in virtual function. 238 template observableObject(E) 239 { 240 Observable!E observableObject(R)(auto ref R observable) 241 { 242 static if (is(R : Observable!E)) 243 { 244 return observable; 245 } 246 else 247 { 248 return new ObservableObject!(R, E)(observable); 249 } 250 } 251 } 252 /// 253 unittest 254 { 255 int subscribeCount = 0; 256 class TestObservable : Observable!int 257 { 258 Disposable subscribe(Observer!int observer) 259 { 260 subscribeCount++; 261 return NopDisposable.instance; 262 } 263 } 264 265 auto test = new TestObservable; 266 auto observable = observableObject!int(test); 267 assert(observable is test); 268 assert(subscribeCount == 0); 269 auto d = observable.subscribe(null); 270 assert(subscribeCount == 1); 271 } 272 273 unittest 274 { 275 int disposeCount = 0; 276 int subscribeCount = 0; 277 278 struct TestDisposable 279 { 280 void dispose() 281 { 282 disposeCount++; 283 } 284 } 285 286 struct TestObservable 287 { 288 TestDisposable subscribe(Observer!int observer) 289 { 290 subscribeCount++; 291 return TestDisposable(); 292 } 293 } 294 295 Observable!int observable = observableObject!int(TestObservable()); 296 assert(subscribeCount == 0); 297 Disposable disposable = observable.subscribe(null); 298 assert(subscribeCount == 1); 299 assert(disposeCount == 0); 300 disposable.dispose(); 301 assert(disposeCount == 1); 302 } 303 304 //######################### 305 // Defer 306 //######################### 307 ///Create observable by function that template parameter. 308 auto defer(E, alias f)() 309 { 310 static struct DeferObservable 311 { 312 alias ElementType = E; 313 314 public: 315 auto subscribe(TObserver)(TObserver observer) 316 { 317 static struct DeferObserver 318 { 319 public: 320 this(TObserver observer, EventSignal signal) 321 { 322 _observer = observer; 323 _signal = signal; 324 } 325 326 public: 327 void put(E obj) 328 { 329 if (_signal.signal) 330 return; 331 332 static if (hasFailure!TObserver) 333 { 334 try 335 { 336 .put(_observer, obj); 337 } 338 catch (Exception e) 339 { 340 _observer.failure(e); 341 } 342 } 343 else 344 { 345 .put(_observer, obj); 346 } 347 } 348 349 void completed() 350 { 351 if (_signal.signal) 352 return; 353 _signal.setSignal(); 354 355 static if (hasCompleted!TObserver) 356 { 357 _observer.completed(); 358 } 359 } 360 361 void failure(Exception e) 362 { 363 if (_signal.signal) 364 return; 365 _signal.setSignal(); 366 367 static if (hasFailure!TObserver) 368 { 369 _observer.failure(e); 370 } 371 } 372 373 private: 374 TObserver _observer; 375 EventSignal _signal; 376 } 377 378 alias fun = unaryFun!f; 379 auto d = new SignalDisposable; 380 fun(DeferObserver(observer, d.signal)); 381 return d; 382 } 383 } 384 385 return DeferObservable(); 386 } 387 /// 388 unittest 389 { 390 auto sub = defer!(int, (observer) { 391 observer.put(1); 392 observer.put(2); 393 observer.put(3); 394 observer.completed(); 395 }); 396 397 int countPut = 0; 398 int countCompleted = 0; 399 struct A 400 { 401 void put(int n) 402 { 403 countPut++; 404 } 405 406 void completed() 407 { 408 countCompleted++; 409 } 410 } 411 412 assert(countPut == 0); 413 assert(countCompleted == 0); 414 auto d = sub.doSubscribe(A()); 415 assert(countPut == 3); 416 assert(countCompleted == 1); 417 } 418 419 unittest 420 { 421 auto sub = defer!(int, (observer) { 422 observer.put(0); 423 observer.failure(new Exception("")); 424 observer.put(1); 425 }); 426 427 int countPut = 0; 428 int countFailure = 0; 429 struct A 430 { 431 void put(int n) 432 { 433 countPut++; 434 } 435 436 void failure(Exception e) 437 { 438 countFailure++; 439 } 440 } 441 442 assert(countPut == 0); 443 assert(countFailure == 0); 444 auto d = sub.doSubscribe(A()); 445 assert(countPut == 1); 446 assert(countFailure == 1); 447 } 448 449 unittest 450 { 451 auto sub = defer!(int, (observer) { 452 observer.put(0); 453 observer.failure(new Exception("")); 454 observer.put(1); 455 }); 456 457 int countPut = 0; 458 struct A 459 { 460 void put(int n) 461 { 462 countPut++; 463 } 464 } 465 466 assert(countPut == 0); 467 auto d = sub.doSubscribe(A()); 468 assert(countPut == 1); 469 } 470 471 unittest 472 { 473 Disposable subscribeImpl(Observer!int observer) 474 { 475 .put(observer, 1); 476 return null; 477 } 478 479 import std.array : appender; 480 481 auto buf = appender!(int[]); 482 483 auto put1 = defer!int(&subscribeImpl); 484 auto d = put1.doSubscribe(buf); 485 486 assert(buf.data.length == 1); 487 assert(buf.data[0] == 1); 488 assert(d is null); 489 } 490 491 auto defer(E, TSubscribe)(auto ref TSubscribe subscribeImpl) 492 { 493 struct DeferObservable 494 { 495 alias ElementType = E; 496 497 TSubscribe _subscribeImpl; 498 499 this(ref TSubscribe subscribeImpl) 500 { 501 _subscribeImpl = subscribeImpl; 502 } 503 504 auto subscribe(TObserver)(auto ref TObserver observer) 505 { 506 return _subscribeImpl(observer); 507 } 508 } 509 510 return DeferObservable(subscribeImpl); 511 } 512 513 unittest 514 { 515 import std.array : appender; 516 517 auto buf = appender!(int[]); 518 519 auto put12 = defer!int((Observer!int observer) { 520 .put(observer, 1); 521 .put(observer, 2); 522 return NopDisposable.instance; 523 }); 524 auto d = put12.doSubscribe(buf); 525 526 assert(buf.data.length == 2); 527 assert(buf.data[0] == 1); 528 assert(buf.data[1] == 2); 529 } 530 531 auto empty(E)() 532 { 533 static struct EmptyObservable 534 { 535 alias ElementType = E; 536 537 Disposable subscribe(TObserver)(auto ref TObserver observer) 538 { 539 static if (hasCompleted!TObserver) 540 { 541 observer.completed(); 542 } 543 return NopDisposable.instance; 544 } 545 } 546 547 return EmptyObservable(); 548 } 549 550 unittest 551 { 552 auto completed = false; 553 auto o = empty!int(); 554 555 assert(!completed); 556 auto d = o.doSubscribe((int n) { }, () { completed = true; }); 557 assert(completed); 558 } 559 560 auto never(E)() 561 { 562 static struct NeverObservable 563 { 564 alias ElementType = E; 565 566 Disposable subscribe(TObserver)(auto ref TObserver observer) 567 { 568 return NopDisposable.instance; 569 } 570 } 571 572 return NeverObservable(); 573 } 574 575 unittest 576 { 577 auto o = never!int(); 578 auto d = o.doSubscribe((int) { }); 579 d.dispose(); 580 } 581 582 auto error(E)(auto ref Exception e) 583 { 584 static struct ErrorObservable 585 { 586 alias ElementType = E; 587 588 Exception _e; 589 590 this(ref Exception e) 591 { 592 _e = e; 593 } 594 595 Disposable subscribe(TObserver)(auto ref TObserver observer) 596 { 597 static if (hasFailure!TObserver) 598 { 599 observer.failure(_e); 600 } 601 return NopDisposable.instance; 602 } 603 } 604 605 return ErrorObservable(e); 606 } 607 608 unittest 609 { 610 auto expected = new Exception("TEST"); 611 auto o = error!int(expected); 612 613 Exception actual = null; 614 o.doSubscribe((int n) { }, (Exception e) { actual = e; }); 615 assert(actual is expected); 616 } 617 618 /// 619 auto from(R)(auto ref R input) if (isInputRange!R) 620 { 621 alias E = ElementType!R; 622 623 static struct FromObservable 624 { 625 alias ElementType = E; 626 627 this(ref R input) 628 { 629 this.input = input; 630 } 631 632 Disposable subscribe(TObserver)(auto ref TObserver observer) 633 if (isOutputRange!(TObserver, ElementType)) 634 { 635 .put(observer, input); 636 return NopDisposable.instance; 637 } 638 639 R input; 640 } 641 642 return FromObservable(input); 643 } 644 /// 645 alias asObservable = from; 646 647 /// 648 unittest 649 { 650 import std.range : iota; 651 652 auto obs = from(iota(10)); 653 auto res = new int[10]; 654 auto d = obs.subscribe(res[]); 655 scope (exit) 656 d.dispose(); 657 658 assert(res.length == 10); 659 assert(res[0] == 0); 660 assert(res[9] == 9); 661 } 662 663 /// 664 unittest 665 { 666 import std.range : iota; 667 668 auto obs = iota(10).asObservable(); 669 auto res = new int[10]; 670 auto d = obs.subscribe(res[]); 671 scope (exit) 672 d.dispose(); 673 674 assert(res.length == 10); 675 assert(res[0] == 0); 676 assert(res[9] == 9); 677 }