1 /+++++++++++++++++++++++++++++ 2 + This module defines the concept of Observable. 3 +/ 4 module rx.observable; 5 6 import std.functional : unaryFun; 7 import std.range : put, isInputRange, isOutputRange, ElementType; 8 9 import rx.disposable; 10 import rx.observer; 11 import rx.util; 12 13 ///Tests if something is a Observable. 14 template isObservable(T, E) 15 { 16 enum bool isObservable = is(T.ElementType : E) && is(typeof({ 17 T observable = void; 18 Observer!E observer = void; 19 auto d = observable.subscribe(observer); 20 static assert(isDisposable!(typeof(d))); 21 }())); 22 } 23 /// 24 unittest 25 { 26 struct TestObservable 27 { 28 alias ElementType = int; 29 30 Disposable subscribe(T)(T observer) 31 { 32 static assert(isObserver!(T, int)); 33 return null; 34 } 35 } 36 37 static assert(isObservable!(TestObservable, int)); 38 static assert(!isObservable!(TestObservable, Object)); 39 } 40 41 ///Test if the observer can subscribe to the observable. 42 template isSubscribable(TObservable, TObserver) 43 { 44 enum bool isSubscribable = is(typeof({ 45 TObservable observable = void; 46 TObserver observer = void; 47 auto d = observable.subscribe(observer); 48 static assert(isDisposable!(typeof(d))); 49 }())); 50 } 51 /// 52 unittest 53 { 54 struct TestDisposable 55 { 56 void dispose() 57 { 58 } 59 } 60 61 struct TestObserver 62 { 63 void put(int n) 64 { 65 } 66 67 void completed() 68 { 69 } 70 71 void failure(Exception e) 72 { 73 } 74 } 75 76 struct TestObservable 77 { 78 TestDisposable subscribe(TestObserver observer) 79 { 80 return TestDisposable(); 81 } 82 } 83 84 static assert(isSubscribable!(TestObservable, TestObserver)); 85 } 86 87 ///The helper for subscribe easier. 88 auto doSubscribe(TObservable, E)(auto ref TObservable observable, void delegate(E) doPut, 89 void delegate() doCompleted, void delegate(Exception) doFailure) 90 { 91 return doSubscribe(observable, makeObserver(doPut, doCompleted, doFailure)); 92 } 93 ///ditto 94 auto doSubscribe(TObservable, E)(auto ref TObservable observable, 95 void delegate(E) doPut, void delegate() doCompleted) 96 { 97 return doSubscribe(observable, makeObserver(doPut, doCompleted)); 98 } 99 ///ditto 100 auto doSubscribe(TObservable, E)(auto ref TObservable observable, 101 void delegate(E) doPut, void delegate(Exception) doFailure) 102 { 103 return doSubscribe(observable, makeObserver(doPut, doFailure)); 104 } 105 ///ditto 106 auto doSubscribe(TObservable, TObserver)(auto ref TObservable observable, auto ref TObserver observer) 107 { 108 alias ElementType = TObservable.ElementType; 109 static if (isSubscribable!(TObservable, TObserver)) 110 return observable.subscribe(observer); 111 else static if (isSubscribable!(TObservable, Observer!ElementType)) 112 return observable.subscribe(observerObject!ElementType(observer)); 113 else 114 static assert(false); 115 } 116 /// 117 unittest 118 { 119 struct TestObserver 120 { 121 void put(int n) 122 { 123 } 124 } 125 126 struct TestObservable1 127 { 128 alias ElementType = int; 129 Disposable subscribe(Observer!int observer) 130 { 131 return null; 132 } 133 } 134 135 struct TestObservable2 136 { 137 alias ElementType = int; 138 Disposable subscribe(T)(T observer) 139 { 140 return null; 141 } 142 } 143 144 TestObservable1 o1; 145 auto d0 = o1.doSubscribe((int n) { }, () { }, (Exception e) { }); 146 auto d1 = o1.doSubscribe((int n) { }, () { }); 147 auto d2 = o1.doSubscribe((int n) { }, (Exception e) { }); 148 auto d3 = o1.doSubscribe((int n) { }); 149 auto d4 = o1.doSubscribe(TestObserver()); 150 TestObservable2 o2; 151 auto d5 = o2.doSubscribe((int n) { }, () { }, (Exception e) { }); 152 auto d6 = o2.doSubscribe((int n) { }, () { }); 153 auto d7 = o2.doSubscribe((int n) { }, (Exception e) { }); 154 auto d8 = o2.doSubscribe((int n) { }); 155 auto d9 = o2.doSubscribe(TestObserver()); 156 } 157 158 ///Wrapper for Observable objects. 159 interface Observable(E) 160 { 161 alias ElementType = E; 162 163 Disposable subscribe(Observer!E observer); 164 } 165 166 unittest 167 { 168 static assert(isObservable!(Observable!int, int)); 169 } 170 171 ///Class that implements Observable interface and wraps the subscribe method in virtual function. 172 class ObservableObject(R, E) : Observable!E 173 { 174 public: 175 this(R observable) 176 { 177 _observable = observable; 178 } 179 180 public: 181 Disposable subscribe(Observer!E observer) 182 { 183 return disposableObject(_observable.subscribe(observer)); 184 } 185 186 private: 187 R _observable; 188 } 189 190 ///Wraps subscribe method in virtual function. 191 template observableObject(E) 192 { 193 Observable!E observableObject(R)(auto ref R observable) 194 { 195 static if (is(R : Observable!E)) 196 { 197 return observable; 198 } 199 else 200 { 201 return new ObservableObject!(R, E)(observable); 202 } 203 } 204 } 205 /// 206 unittest 207 { 208 int subscribeCount = 0; 209 class TestObservable : Observable!int 210 { 211 Disposable subscribe(Observer!int observer) 212 { 213 subscribeCount++; 214 return NopDisposable.instance; 215 } 216 } 217 218 auto test = new TestObservable; 219 auto observable = observableObject!int(test); 220 assert(observable is test); 221 assert(subscribeCount == 0); 222 auto d = observable.subscribe(null); 223 assert(subscribeCount == 1); 224 } 225 226 unittest 227 { 228 int disposeCount = 0; 229 int subscribeCount = 0; 230 231 struct TestDisposable 232 { 233 void dispose() 234 { 235 disposeCount++; 236 } 237 } 238 239 struct TestObservable 240 { 241 TestDisposable subscribe(Observer!int observer) 242 { 243 subscribeCount++; 244 return TestDisposable(); 245 } 246 } 247 248 Observable!int observable = observableObject!int(TestObservable()); 249 assert(subscribeCount == 0); 250 Disposable disposable = observable.subscribe(null); 251 assert(subscribeCount == 1); 252 assert(disposeCount == 0); 253 disposable.dispose(); 254 assert(disposeCount == 1); 255 } 256 257 //######################### 258 // Defer 259 //######################### 260 ///Create observable by function that template parameter. 261 auto defer(E, alias f)() 262 { 263 static struct DeferObservable 264 { 265 alias ElementType = E; 266 267 public: 268 auto subscribe(TObserver)(TObserver observer) 269 { 270 static struct DeferObserver 271 { 272 public: 273 this(TObserver observer, EventSignal signal) 274 { 275 _observer = observer; 276 _signal = signal; 277 } 278 279 public: 280 void put(E obj) 281 { 282 if (_signal.signal) 283 return; 284 285 static if (hasFailure!TObserver) 286 { 287 try 288 { 289 .put(_observer, obj); 290 } 291 catch (Exception e) 292 { 293 _observer.failure(e); 294 } 295 } 296 else 297 { 298 .put(_observer, obj); 299 } 300 } 301 302 void completed() 303 { 304 if (_signal.signal) 305 return; 306 _signal.setSignal(); 307 308 static if (hasCompleted!TObserver) 309 { 310 _observer.completed(); 311 } 312 } 313 314 void failure(Exception e) 315 { 316 if (_signal.signal) 317 return; 318 _signal.setSignal(); 319 320 static if (hasFailure!TObserver) 321 { 322 _observer.failure(e); 323 } 324 } 325 326 private: 327 TObserver _observer; 328 EventSignal _signal; 329 } 330 331 alias fun = unaryFun!f; 332 auto d = new SignalDisposable; 333 fun(DeferObserver(observer, d.signal)); 334 return d; 335 } 336 } 337 338 return DeferObservable(); 339 } 340 /// 341 unittest 342 { 343 auto sub = defer!(int, (observer) { 344 observer.put(1); 345 observer.put(2); 346 observer.put(3); 347 observer.completed(); 348 }); 349 350 int countPut = 0; 351 int countCompleted = 0; 352 struct A 353 { 354 void put(int n) 355 { 356 countPut++; 357 } 358 359 void completed() 360 { 361 countCompleted++; 362 } 363 } 364 365 assert(countPut == 0); 366 assert(countCompleted == 0); 367 auto d = sub.doSubscribe(A()); 368 assert(countPut == 3); 369 assert(countCompleted == 1); 370 } 371 372 unittest 373 { 374 auto sub = defer!(int, (observer) { 375 observer.put(0); 376 observer.failure(new Exception("")); 377 observer.put(1); 378 }); 379 380 int countPut = 0; 381 int countFailure = 0; 382 struct A 383 { 384 void put(int n) 385 { 386 countPut++; 387 } 388 389 void failure(Exception e) 390 { 391 countFailure++; 392 } 393 } 394 395 assert(countPut == 0); 396 assert(countFailure == 0); 397 auto d = sub.doSubscribe(A()); 398 assert(countPut == 1); 399 assert(countFailure == 1); 400 } 401 402 unittest 403 { 404 auto sub = defer!(int, (observer) { 405 observer.put(0); 406 observer.failure(new Exception("")); 407 observer.put(1); 408 }); 409 410 int countPut = 0; 411 struct A 412 { 413 void put(int n) 414 { 415 countPut++; 416 } 417 } 418 419 assert(countPut == 0); 420 auto d = sub.doSubscribe(A()); 421 assert(countPut == 1); 422 } 423 424 unittest 425 { 426 Disposable subscribeImpl(Observer!int observer) 427 { 428 .put(observer, 1); 429 return null; 430 } 431 432 import std.array : appender; 433 434 auto buf = appender!(int[]); 435 436 auto put1 = defer!int(&subscribeImpl); 437 auto d = put1.doSubscribe(buf); 438 439 assert(buf.data.length == 1); 440 assert(buf.data[0] == 1); 441 assert(d is null); 442 } 443 444 auto defer(E, TSubscribe)(auto ref TSubscribe subscribeImpl) 445 { 446 struct DeferObservable 447 { 448 alias ElementType = E; 449 450 TSubscribe _subscribeImpl; 451 452 this(ref TSubscribe subscribeImpl) 453 { 454 _subscribeImpl = subscribeImpl; 455 } 456 457 auto subscribe(TObserver)(auto ref TObserver observer) 458 { 459 return _subscribeImpl(observer); 460 } 461 } 462 463 return DeferObservable(subscribeImpl); 464 } 465 466 unittest 467 { 468 import std.array : appender; 469 470 auto buf = appender!(int[]); 471 472 auto put12 = defer!int((Observer!int observer) { 473 .put(observer, 1); 474 .put(observer, 2); 475 return NopDisposable.instance; 476 }); 477 auto d = put12.doSubscribe(buf); 478 479 assert(buf.data.length == 2); 480 assert(buf.data[0] == 1); 481 assert(buf.data[1] == 2); 482 } 483 484 auto empty(E)() 485 { 486 static struct EmptyObservable 487 { 488 alias ElementType = E; 489 490 Disposable subscribe(TObserver)(auto ref TObserver observer) 491 { 492 static if (hasCompleted!TObserver) 493 { 494 observer.completed(); 495 } 496 return NopDisposable.instance; 497 } 498 } 499 500 return EmptyObservable(); 501 } 502 503 unittest 504 { 505 auto completed = false; 506 auto o = empty!int(); 507 508 assert(!completed); 509 auto d = o.doSubscribe((int n) { }, () { completed = true; }); 510 assert(completed); 511 } 512 513 auto never(E)() 514 { 515 static struct NeverObservable 516 { 517 alias ElementType = E; 518 519 Disposable subscribe(TObserver)(auto ref TObserver observer) 520 { 521 return NopDisposable.instance; 522 } 523 } 524 525 return NeverObservable(); 526 } 527 528 unittest 529 { 530 auto o = never!int(); 531 auto d = o.doSubscribe((int) { }); 532 d.dispose(); 533 } 534 535 auto error(E)(auto ref Exception e) 536 { 537 static struct ErrorObservable 538 { 539 alias ElementType = E; 540 541 Exception _e; 542 543 this(ref Exception e) 544 { 545 _e = e; 546 } 547 548 Disposable subscribe(TObserver)(auto ref TObserver observer) 549 { 550 static if (hasFailure!TObserver) 551 { 552 observer.failure(_e); 553 } 554 return NopDisposable.instance; 555 } 556 } 557 558 return ErrorObservable(e); 559 } 560 561 unittest 562 { 563 auto expected = new Exception("TEST"); 564 auto o = error!int(expected); 565 566 Exception actual = null; 567 o.doSubscribe((int n) { }, (Exception e) { actual = e; }); 568 assert(actual is expected); 569 } 570 571 /// 572 auto from(R)(auto ref R input) if (isInputRange!R) 573 { 574 alias E = ElementType!R; 575 576 static struct FromObservable 577 { 578 alias ElementType = E; 579 580 this(ref R input) 581 { 582 this.input = input; 583 } 584 585 Disposable subscribe(TObserver)(auto ref TObserver observer) 586 if (isOutputRange!(TObserver, ElementType)) 587 { 588 .put(observer, input); 589 return NopDisposable.instance; 590 } 591 592 R input; 593 } 594 595 return FromObservable(input); 596 } 597 /// 598 alias asObservable = from; 599 600 /// 601 unittest 602 { 603 import std.range : iota; 604 605 auto obs = from(iota(10)); 606 auto res = new int[10]; 607 auto d = obs.subscribe(res[]); 608 scope (exit) 609 d.dispose(); 610 611 assert(res.length == 10); 612 assert(res[0] == 0); 613 assert(res[9] == 9); 614 } 615 616 /// 617 unittest 618 { 619 import std.range : iota; 620 621 auto obs = iota(10).asObservable(); 622 auto res = new int[10]; 623 auto d = obs.subscribe(res[]); 624 scope (exit) 625 d.dispose(); 626 627 assert(res.length == 10); 628 assert(res[0] == 0); 629 assert(res[9] == 9); 630 }