1 /+++++++++++++++++++++++++++++ 2 + This module defines the Subject and some implements. 3 +/ 4 module rx.subject; 5 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 10 import core.atomic : atomicLoad, cas; 11 import std.range : put; 12 13 ///Represents an object that is both an observable sequence as well as an observer. 14 interface Subject(E) : Observer!E, Observable!E 15 { 16 } 17 18 ///Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers. 19 class SubjectObject(E) : Subject!E 20 { 21 alias ElementType = E; 22 23 public: 24 /// 25 this() 26 { 27 _observer = cast(shared) NopObserver!E.instance; 28 } 29 30 public: 31 /// 32 void put(E obj) 33 { 34 auto temp = atomicLoad(_observer); 35 .put(temp, obj); 36 } 37 /// 38 void completed() 39 { 40 shared(Observer!E) oldObserver = void; 41 shared(Observer!E) newObserver = cast(shared) DoneObserver!E.instance; 42 Observer!E temp = void; 43 do 44 { 45 oldObserver = _observer; 46 temp = atomicLoad(oldObserver); 47 if (cast(DoneObserver!E) temp) 48 break; 49 } 50 while (!cas(&_observer, oldObserver, newObserver)); 51 temp.completed(); 52 } 53 /// 54 void failure(Exception error) 55 { 56 shared(Observer!E) oldObserver = void; 57 shared(Observer!E) newObserver = cast(shared) new DoneObserver!E(error); 58 Observer!E temp = void; 59 do 60 { 61 oldObserver = _observer; 62 temp = atomicLoad(oldObserver); 63 if (cast(DoneObserver!E) temp) 64 break; 65 } 66 while (!cas(&_observer, oldObserver, newObserver)); 67 temp.failure(error); 68 } 69 70 /// 71 Disposable subscribe(T)(T observer) 72 { 73 return subscribe(observerObject!E(observer)); 74 } 75 /// 76 Disposable subscribe(Observer!E observer) 77 { 78 shared(Observer!E) oldObserver = void; 79 shared(Observer!E) newObserver = void; 80 do 81 { 82 oldObserver = _observer; 83 auto temp = atomicLoad(oldObserver); 84 85 if (temp is DoneObserver!E.instance) 86 { 87 observer.completed(); 88 return NopDisposable.instance; 89 } 90 91 if (auto fail = cast(DoneObserver!E) temp) 92 { 93 observer.failure(fail.exception); 94 return NopDisposable.instance; 95 } 96 97 if (auto composite = cast(CompositeObserver!E) temp) 98 { 99 newObserver = cast(shared) composite.add(observer); 100 } 101 else if (auto nop = cast(NopObserver!E) temp) 102 { 103 newObserver = cast(shared) observer; 104 } 105 else 106 { 107 newObserver = cast(shared)(new CompositeObserver!E([temp, observer])); 108 } 109 } 110 while (!cas(&_observer, oldObserver, newObserver)); 111 112 return subscription(this, observer); 113 } 114 115 /// 116 void unsubscribe(Observer!E observer) 117 { 118 shared(Observer!E) oldObserver = void; 119 shared(Observer!E) newObserver = void; 120 do 121 { 122 oldObserver = _observer; 123 124 auto temp = atomicLoad(oldObserver); 125 if (auto composite = cast(CompositeObserver!E) temp) 126 { 127 newObserver = cast(shared) composite.remove(observer); 128 } 129 else 130 { 131 if (temp !is observer) 132 return; 133 134 newObserver = cast(shared) NopObserver!E.instance; 135 } 136 } 137 while (!cas(&_observer, oldObserver, newObserver)); 138 } 139 140 private: 141 shared(Observer!E) _observer; 142 } 143 144 /// 145 unittest 146 { 147 import std.array : appender; 148 149 auto data = appender!(int[])(); 150 auto subject = new SubjectObject!int; 151 auto disposable = subject.subscribe(observerObject!(int)(data)); 152 assert(disposable !is null); 153 subject.put(0); 154 subject.put(1); 155 156 import std.algorithm : equal; 157 158 assert(equal(data.data, [0, 1])); 159 160 disposable.dispose(); 161 subject.put(2); 162 assert(equal(data.data, [0, 1])); 163 } 164 165 unittest 166 { 167 static assert(isObserver!(SubjectObject!int, int)); 168 static assert(isObservable!(SubjectObject!int, int)); 169 static assert(!isObservable!(SubjectObject!int, string)); 170 static assert(!isObservable!(SubjectObject!int, string)); 171 } 172 173 unittest 174 { 175 auto subject = new SubjectObject!int; 176 auto observer = new CounterObserver!int; 177 auto disposable = subject.subscribe(observer); 178 scope (exit) 179 disposable.dispose(); 180 181 subject.put(0); 182 subject.put(1); 183 184 assert(observer.putCount == 2); 185 subject.completed(); 186 subject.put(2); 187 assert(observer.putCount == 2); 188 assert(observer.completedCount == 1); 189 } 190 191 unittest 192 { 193 auto subject = new SubjectObject!int; 194 auto observer = new CounterObserver!int; 195 auto disposable = subject.subscribe(observer); 196 scope (exit) 197 disposable.dispose(); 198 199 subject.put(0); 200 subject.put(1); 201 202 assert(observer.putCount == 2); 203 auto ex = new Exception("Exception"); 204 subject.failure(ex); 205 subject.put(2); 206 assert(observer.putCount == 2); 207 assert(observer.failureCount == 1); 208 assert(observer.lastException is ex); 209 } 210 211 unittest 212 { 213 import std.array : appender; 214 215 auto buf1 = appender!(int[]); 216 auto buf2 = appender!(int[]); 217 auto subject = new SubjectObject!int; 218 subject.subscribe(observerObject!(int)(buf1)); 219 subject.doSubscribe((int n) => buf2.put(n)); 220 221 assert(buf1.data.length == 0); 222 assert(buf2.data.length == 0); 223 subject.put(0); 224 assert(buf1.data.length == 1); 225 assert(buf2.data.length == 1); 226 assert(buf1.data[0] == buf2.data[0]); 227 } 228 229 unittest 230 { 231 auto sub = new SubjectObject!int; 232 sub.completed(); 233 234 auto observer = new CounterObserver!int; 235 assert(observer.putCount == 0); 236 assert(observer.completedCount == 0); 237 assert(observer.failureCount == 0); 238 sub.subscribe(observer); 239 assert(observer.putCount == 0); 240 assert(observer.completedCount == 1); 241 assert(observer.failureCount == 0); 242 } 243 244 unittest 245 { 246 auto sub = new SubjectObject!int; 247 auto ex = new Exception("Exception"); 248 sub.failure(ex); 249 250 auto observer = new CounterObserver!int; 251 assert(observer.putCount == 0); 252 assert(observer.completedCount == 0); 253 assert(observer.failureCount == 0); 254 sub.subscribe(observer); 255 assert(observer.putCount == 0); 256 assert(observer.completedCount == 0); 257 assert(observer.failureCount == 1); 258 assert(observer.lastException is ex); 259 } 260 261 private class Subscription(TSubject, TObserver) : Disposable 262 { 263 public: 264 this(TSubject subject, TObserver observer) 265 { 266 _subject = subject; 267 _observer = observer; 268 } 269 270 public: 271 void dispose() 272 { 273 if (_subject !is null) 274 { 275 _subject.unsubscribe(_observer); 276 _subject = null; 277 } 278 } 279 280 private: 281 TSubject _subject; 282 TObserver _observer; 283 } 284 285 private Subscription!(TSubject, TObserver) subscription(TSubject, TObserver)( 286 TSubject subject, TObserver observer) 287 { 288 return new typeof(return)(subject, observer); 289 } 290 291 /// 292 class AsyncSubject(E) : Subject!E 293 { 294 public: 295 /// 296 Disposable subscribe(Observer!E observer) 297 { 298 Exception ex = null; 299 E value; 300 bool hasValue = false; 301 302 synchronized (this) 303 { 304 if (!_isStopped) 305 { 306 _observers ~= observer; 307 return subscription(this, observer); 308 } 309 310 ex = _exception; 311 hasValue = _hasValue; 312 value = _value; 313 } 314 315 if (ex !is null) 316 { 317 observer.failure(ex); 318 } 319 else if (hasValue) 320 { 321 .put(observer, value); 322 observer.completed(); 323 } 324 else 325 { 326 observer.completed(); 327 } 328 329 return NopDisposable.instance; 330 } 331 332 /// 333 auto subscribe(T)(T observer) 334 { 335 return subscribe(observerObject!E(observer)); 336 } 337 338 /// 339 void unsubscribe(Observer!E observer) 340 { 341 if (observer is null) 342 return; 343 344 synchronized (this) 345 { 346 import std.algorithm : remove, countUntil; 347 348 auto index = countUntil(_observers, observer); 349 if (index != -1) 350 { 351 _observers = remove(_observers, index); 352 } 353 } 354 } 355 356 public: 357 /// 358 void put(E value) 359 { 360 synchronized (this) 361 { 362 if (!_isStopped) 363 { 364 _value = value; 365 _hasValue = true; 366 } 367 } 368 } 369 370 /// 371 void completed() 372 { 373 Observer!E[] os = null; 374 375 E value; 376 bool hasValue = false; 377 378 synchronized (this) 379 { 380 if (!_isStopped) 381 { 382 os = _observers; 383 _observers.length = 0; 384 _isStopped = true; 385 value = _value; 386 hasValue = _hasValue; 387 } 388 } 389 390 if (os) 391 { 392 if (hasValue) 393 { 394 foreach (observer; os) 395 { 396 .put(observer, value); 397 observer.completed(); 398 } 399 } 400 else 401 { 402 foreach (observer; os) 403 { 404 observer.completed(); 405 } 406 } 407 } 408 } 409 410 /// 411 void failure(Exception e) 412 { 413 assert(e !is null); 414 415 Observer!E[] os = null; 416 synchronized (this) 417 { 418 if (!_isStopped) 419 { 420 os = _observers; 421 _observers.length = 0; 422 _isStopped = true; 423 _exception = e; 424 } 425 } 426 427 if (os) 428 { 429 foreach (observer; os) 430 { 431 observer.failure(e); 432 } 433 } 434 } 435 436 private: 437 Observer!E[] _observers; 438 bool _isStopped; 439 E _value; 440 bool _hasValue; 441 Exception _exception; 442 } 443 444 unittest 445 { 446 auto sub = new AsyncSubject!int; 447 448 .put(sub, 1); 449 sub.completed(); 450 451 auto observer = new CounterObserver!int; 452 453 assert(observer.hasNotBeenCalled); 454 455 sub.subscribe(observer); 456 457 assert(observer.putCount == 1); 458 assert(observer.completedCount == 1); 459 assert(observer.failureCount == 0); 460 assert(observer.lastValue == 1); 461 } 462 463 unittest 464 { 465 auto sub = new AsyncSubject!int; 466 auto observer = new CounterObserver!int; 467 468 auto d = sub.subscribe(observer); 469 scope (exit) 470 d.dispose(); 471 472 assert(observer.hasNotBeenCalled); 473 474 sub.put(100); 475 476 assert(observer.hasNotBeenCalled); 477 478 assert(sub._hasValue); 479 assert(sub._value == 100); 480 481 sub.completed(); 482 483 assert(observer.putCount == 1); 484 assert(observer.completedCount == 1); 485 assert(observer.failureCount == 0); 486 assert(observer.lastValue == 100); 487 } 488 489 unittest 490 { 491 auto sub = new AsyncSubject!int; 492 auto observer = new CounterObserver!int; 493 494 sub.put(100); 495 496 assert(sub._hasValue); 497 assert(sub._value == 100); 498 499 auto d = sub.subscribe(observer); 500 scope (exit) 501 d.dispose(); 502 503 assert(observer.hasNotBeenCalled); 504 505 sub.completed(); 506 507 assert(observer.putCount == 1); 508 assert(observer.completedCount == 1); 509 assert(observer.failureCount == 0); 510 assert(observer.lastValue == 100); 511 } 512 513 unittest 514 { 515 auto sub = new AsyncSubject!int; 516 auto observer = new CounterObserver!int; 517 518 auto d = sub.subscribe(observer); 519 520 d.dispose(); 521 assert(observer.hasNotBeenCalled); 522 523 sub.put(100); 524 assert(observer.hasNotBeenCalled); 525 526 sub.completed(); 527 assert(observer.hasNotBeenCalled); 528 } 529 530 unittest 531 { 532 auto sub = new AsyncSubject!int; 533 auto observer = new CounterObserver!int; 534 535 auto d = sub.subscribe(observer); 536 assert(observer.hasNotBeenCalled); 537 538 sub.put(100); 539 assert(observer.hasNotBeenCalled); 540 541 d.dispose(); 542 assert(observer.hasNotBeenCalled); 543 544 sub.completed(); 545 assert(observer.hasNotBeenCalled); 546 } 547 548 unittest 549 { 550 551 auto sub = new AsyncSubject!int; 552 auto observer = new CounterObserver!int; 553 554 sub.put(100); 555 assert(observer.hasNotBeenCalled); 556 557 auto d = sub.subscribe(observer); 558 assert(observer.hasNotBeenCalled); 559 560 d.dispose(); 561 assert(observer.hasNotBeenCalled); 562 563 sub.completed(); 564 assert(observer.hasNotBeenCalled); 565 } 566 567 unittest 568 { 569 auto sub = new AsyncSubject!int; 570 auto observer = new CounterObserver!int; 571 572 auto d = sub.subscribe(observer); 573 scope (exit) 574 d.dispose(); 575 576 assert(observer.hasNotBeenCalled); 577 578 sub.completed(); 579 580 assert(observer.putCount == 0); 581 assert(observer.completedCount == 1); 582 assert(observer.failureCount == 0); 583 } 584 585 unittest 586 { 587 auto sub = new AsyncSubject!int; 588 auto observer = new CounterObserver!int; 589 590 auto d = sub.subscribe(observer); 591 scope (exit) 592 d.dispose(); 593 594 assert(observer.hasNotBeenCalled); 595 596 auto ex = new Exception("TEST"); 597 sub.failure(ex); 598 599 assert(observer.putCount == 0); 600 assert(observer.completedCount == 0); 601 assert(observer.failureCount == 1); 602 assert(observer.lastException is ex); 603 } 604 605 unittest 606 { 607 auto sub = new AsyncSubject!int; 608 auto ex = new Exception("TEST"); 609 sub.failure(ex); 610 611 auto observer = new CounterObserver!int; 612 613 auto d = sub.subscribe(observer); 614 scope (exit) 615 d.dispose(); 616 617 assert(observer.putCount == 0); 618 assert(observer.completedCount == 0); 619 assert(observer.failureCount == 1); 620 assert(observer.lastException is ex); 621 } 622 623 unittest 624 { 625 auto sub = new AsyncSubject!int; 626 auto observer = new CounterObserver!int; 627 628 sub.completed(); 629 assert(observer.hasNotBeenCalled); 630 631 sub.subscribe(observer); 632 assert(observer.putCount == 0); 633 assert(observer.completedCount == 1); 634 assert(observer.failureCount == 0); 635 } 636 637 version (unittest) 638 { 639 class CounterObserver(T) : Observer!T 640 { 641 public: 642 size_t putCount; 643 size_t completedCount; 644 size_t failureCount; 645 T lastValue; 646 Exception lastException; 647 648 public: 649 bool hasNotBeenCalled() const pure nothrow @nogc @safe @property 650 { 651 return putCount == 0 && completedCount == 0 && failureCount == 0; 652 } 653 654 public: 655 void put(T obj) 656 { 657 putCount++; 658 lastValue = obj; 659 } 660 661 void completed() 662 { 663 completedCount++; 664 } 665 666 void failure(Exception e) 667 { 668 failureCount++; 669 lastException = e; 670 } 671 } 672 } 673 674 /// 675 class BehaviorSubject(E) : Subject!E 676 { 677 public: 678 /// 679 this() 680 { 681 this(E.init); 682 } 683 684 /// 685 this(E value) 686 { 687 _subject = new SubjectObject!E; 688 _value = value; 689 } 690 691 public: 692 /// 693 inout(E) value() inout @property 694 { 695 return _value; 696 } 697 698 /// 699 void value(E value) @property 700 { 701 if (_value != value) 702 { 703 _value = value; 704 .put(_subject, value); 705 } 706 } 707 708 public: 709 /// 710 auto subscribe(TObserver)(auto ref TObserver observer) 711 { 712 .put(observer, value); 713 return _subject.doSubscribe(observer); 714 } 715 716 /// 717 Disposable subscribe(Observer!E observer) 718 { 719 .put(observer, value); 720 return disposableObject(_subject.doSubscribe(observer)); 721 } 722 723 /// 724 void put(E obj) 725 { 726 value = obj; 727 } 728 729 /// 730 void completed() 731 { 732 _subject.completed(); 733 } 734 735 /// 736 void failure(Exception e) 737 { 738 _subject.failure(e); 739 } 740 741 private: 742 SubjectObject!E _subject; 743 E _value; 744 } 745 746 unittest 747 { 748 static assert(isObservable!(BehaviorSubject!int, int)); 749 static assert(is(BehaviorSubject!int.ElementType == int)); 750 } 751 752 unittest 753 { 754 int num = 0; 755 auto subject = new BehaviorSubject!int(100); 756 757 auto d = subject.doSubscribe((int n) { num = n; }); 758 assert(num == 100); 759 760 .put(subject, 1); 761 assert(num == 1); 762 763 d.dispose(); 764 .put(subject, 10); 765 assert(num == 1); 766 }