1 /+++++++++++++++++++++++++++++ 2 + This module defines the Subject and some implements. 3 +/ 4 module rx.subject; 5 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 import rx.util : assumeThreadLocal; 10 11 import core.atomic : atomicLoad, cas; 12 import std.range : put; 13 14 ///Represents an object that is both an observable sequence as well as an observer. 15 interface Subject(E) : Observer!E, Observable!E 16 { 17 } 18 19 ///Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers. 20 class SubjectObject(E) : Subject!E 21 { 22 alias ElementType = E; 23 24 public: 25 /// 26 this() 27 { 28 _observer = cast(shared) NopObserver!E.instance; 29 } 30 31 public: 32 /// 33 void put(E obj) 34 { 35 auto temp = assumeThreadLocal(atomicLoad(_observer)); 36 .put(temp, obj); 37 } 38 /// 39 void completed() 40 { 41 shared(Observer!E) oldObserver = void; 42 shared(Observer!E) newObserver = cast(shared) DoneObserver!E.instance; 43 Observer!E temp = void; 44 do 45 { 46 oldObserver = _observer; 47 temp = assumeThreadLocal(atomicLoad(oldObserver)); 48 if (cast(DoneObserver!E) temp) 49 break; 50 } 51 while (!cas(&_observer, oldObserver, newObserver)); 52 temp.completed(); 53 } 54 /// 55 void failure(Exception error) 56 { 57 shared(Observer!E) oldObserver = void; 58 shared(Observer!E) newObserver = cast(shared) new DoneObserver!E(error); 59 Observer!E temp = void; 60 do 61 { 62 oldObserver = _observer; 63 temp = assumeThreadLocal(atomicLoad(oldObserver)); 64 if (cast(DoneObserver!E) temp) 65 break; 66 } 67 while (!cas(&_observer, oldObserver, newObserver)); 68 temp.failure(error); 69 } 70 71 /// 72 Disposable subscribe(T)(T observer) 73 { 74 return subscribe(observerObject!E(observer)); 75 } 76 /// 77 Disposable subscribe(Observer!E observer) 78 { 79 shared(Observer!E) oldObserver = void; 80 shared(Observer!E) newObserver = void; 81 do 82 { 83 oldObserver = _observer; 84 auto temp = assumeThreadLocal(atomicLoad(oldObserver)); 85 86 if (temp is DoneObserver!E.instance) 87 { 88 observer.completed(); 89 return NopDisposable.instance; 90 } 91 92 if (auto fail = cast(DoneObserver!E) temp) 93 { 94 observer.failure(fail.exception); 95 return NopDisposable.instance; 96 } 97 98 if (auto composite = cast(CompositeObserver!E) temp) 99 { 100 newObserver = cast(shared) composite.add(observer); 101 } 102 else if (auto nop = cast(NopObserver!E) temp) 103 { 104 newObserver = cast(shared) observer; 105 } 106 else 107 { 108 newObserver = cast(shared)(new CompositeObserver!E([temp, observer])); 109 } 110 } 111 while (!cas(&_observer, oldObserver, newObserver)); 112 113 return subscription(this, observer); 114 } 115 116 /// 117 void unsubscribe(Observer!E observer) 118 { 119 shared(Observer!E) oldObserver = void; 120 shared(Observer!E) newObserver = void; 121 do 122 { 123 oldObserver = _observer; 124 125 import rx.util : assumeThreadLocal; 126 127 auto temp = assumeThreadLocal(atomicLoad(oldObserver)); 128 if (auto composite = cast(CompositeObserver!E) temp) 129 { 130 newObserver = cast(shared) composite.remove(observer); 131 } 132 else 133 { 134 if (temp !is observer) 135 return; 136 137 newObserver = cast(shared) NopObserver!E.instance; 138 } 139 } 140 while (!cas(&_observer, oldObserver, newObserver)); 141 } 142 143 private: 144 shared(Observer!E) _observer; 145 } 146 147 /// 148 unittest 149 { 150 import std.array : appender; 151 152 auto data = appender!(int[])(); 153 auto subject = new SubjectObject!int; 154 auto disposable = subject.subscribe(observerObject!(int)(data)); 155 assert(disposable !is null); 156 subject.put(0); 157 subject.put(1); 158 159 import std.algorithm : equal; 160 161 assert(equal(data.data, [0, 1])); 162 163 disposable.dispose(); 164 subject.put(2); 165 assert(equal(data.data, [0, 1])); 166 } 167 168 unittest 169 { 170 static assert(isObserver!(SubjectObject!int, int)); 171 static assert(isObservable!(SubjectObject!int, int)); 172 static assert(!isObservable!(SubjectObject!int, string)); 173 static assert(!isObservable!(SubjectObject!int, string)); 174 } 175 176 unittest 177 { 178 auto subject = new SubjectObject!int; 179 auto observer = new CounterObserver!int; 180 auto disposable = subject.subscribe(observer); 181 scope (exit) 182 disposable.dispose(); 183 184 subject.put(0); 185 subject.put(1); 186 187 assert(observer.putCount == 2); 188 subject.completed(); 189 subject.put(2); 190 assert(observer.putCount == 2); 191 assert(observer.completedCount == 1); 192 } 193 194 unittest 195 { 196 auto subject = new SubjectObject!int; 197 auto observer = new CounterObserver!int; 198 auto disposable = subject.subscribe(observer); 199 scope (exit) 200 disposable.dispose(); 201 202 subject.put(0); 203 subject.put(1); 204 205 assert(observer.putCount == 2); 206 auto ex = new Exception("Exception"); 207 subject.failure(ex); 208 subject.put(2); 209 assert(observer.putCount == 2); 210 assert(observer.failureCount == 1); 211 assert(observer.lastException is ex); 212 } 213 214 unittest 215 { 216 import std.array : appender; 217 218 auto buf1 = appender!(int[]); 219 auto buf2 = appender!(int[]); 220 auto subject = new SubjectObject!int; 221 subject.subscribe(observerObject!(int)(buf1)); 222 subject.doSubscribe((int n) => buf2.put(n)); 223 224 assert(buf1.data.length == 0); 225 assert(buf2.data.length == 0); 226 subject.put(0); 227 assert(buf1.data.length == 1); 228 assert(buf2.data.length == 1); 229 assert(buf1.data[0] == buf2.data[0]); 230 } 231 232 unittest 233 { 234 auto sub = new SubjectObject!int; 235 sub.completed(); 236 237 auto observer = new CounterObserver!int; 238 assert(observer.putCount == 0); 239 assert(observer.completedCount == 0); 240 assert(observer.failureCount == 0); 241 sub.subscribe(observer); 242 assert(observer.putCount == 0); 243 assert(observer.completedCount == 1); 244 assert(observer.failureCount == 0); 245 } 246 247 unittest 248 { 249 auto sub = new SubjectObject!int; 250 auto ex = new Exception("Exception"); 251 sub.failure(ex); 252 253 auto observer = new CounterObserver!int; 254 assert(observer.putCount == 0); 255 assert(observer.completedCount == 0); 256 assert(observer.failureCount == 0); 257 sub.subscribe(observer); 258 assert(observer.putCount == 0); 259 assert(observer.completedCount == 0); 260 assert(observer.failureCount == 1); 261 assert(observer.lastException is ex); 262 } 263 264 private class Subscription(TSubject, TObserver) : Disposable 265 { 266 public: 267 this(TSubject subject, TObserver observer) 268 { 269 _subject = subject; 270 _observer = observer; 271 } 272 273 public: 274 void dispose() 275 { 276 if (_subject !is null) 277 { 278 _subject.unsubscribe(_observer); 279 _subject = null; 280 } 281 } 282 283 private: 284 TSubject _subject; 285 TObserver _observer; 286 } 287 288 private Subscription!(TSubject, TObserver) subscription(TSubject, TObserver)( 289 TSubject subject, TObserver observer) 290 { 291 return new typeof(return)(subject, observer); 292 } 293 294 /// 295 class AsyncSubject(E) : Subject!E 296 { 297 public: 298 /// 299 Disposable subscribe(Observer!E observer) 300 { 301 Exception ex = null; 302 E value; 303 bool hasValue = false; 304 305 synchronized (this) 306 { 307 if (!_isStopped) 308 { 309 _observers ~= observer; 310 return subscription(this, observer); 311 } 312 313 ex = _exception; 314 hasValue = _hasValue; 315 value = _value; 316 } 317 318 if (ex !is null) 319 { 320 observer.failure(ex); 321 } 322 else if (hasValue) 323 { 324 .put(observer, value); 325 observer.completed(); 326 } 327 else 328 { 329 observer.completed(); 330 } 331 332 return NopDisposable.instance; 333 } 334 335 /// 336 auto subscribe(T)(T observer) 337 { 338 return subscribe(observerObject!E(observer)); 339 } 340 341 /// 342 void unsubscribe(Observer!E observer) 343 { 344 if (observer is null) 345 return; 346 347 synchronized (this) 348 { 349 import std.algorithm : remove, countUntil; 350 351 auto index = countUntil(_observers, observer); 352 if (index != -1) 353 { 354 _observers = remove(_observers, index); 355 } 356 } 357 } 358 359 public: 360 /// 361 void put(E value) 362 { 363 synchronized (this) 364 { 365 if (!_isStopped) 366 { 367 _value = value; 368 _hasValue = true; 369 } 370 } 371 } 372 373 /// 374 void completed() 375 { 376 Observer!E[] os = null; 377 378 E value; 379 bool hasValue = false; 380 381 synchronized (this) 382 { 383 if (!_isStopped) 384 { 385 os = _observers; 386 _observers.length = 0; 387 _isStopped = true; 388 value = _value; 389 hasValue = _hasValue; 390 } 391 } 392 393 if (os) 394 { 395 if (hasValue) 396 { 397 foreach (observer; os) 398 { 399 .put(observer, value); 400 observer.completed(); 401 } 402 } 403 else 404 { 405 foreach (observer; os) 406 { 407 observer.completed(); 408 } 409 } 410 } 411 } 412 413 /// 414 void failure(Exception e) 415 { 416 assert(e !is null); 417 418 Observer!E[] os = null; 419 synchronized (this) 420 { 421 if (!_isStopped) 422 { 423 os = _observers; 424 _observers.length = 0; 425 _isStopped = true; 426 _exception = e; 427 } 428 } 429 430 if (os) 431 { 432 foreach (observer; os) 433 { 434 observer.failure(e); 435 } 436 } 437 } 438 439 private: 440 Observer!E[] _observers; 441 bool _isStopped; 442 E _value; 443 bool _hasValue; 444 Exception _exception; 445 } 446 447 unittest 448 { 449 auto sub = new AsyncSubject!int; 450 451 .put(sub, 1); 452 sub.completed(); 453 454 auto observer = new CounterObserver!int; 455 456 assert(observer.hasNotBeenCalled); 457 458 sub.subscribe(observer); 459 460 assert(observer.putCount == 1); 461 assert(observer.completedCount == 1); 462 assert(observer.failureCount == 0); 463 assert(observer.lastValue == 1); 464 } 465 466 unittest 467 { 468 auto sub = new AsyncSubject!int; 469 auto observer = new CounterObserver!int; 470 471 auto d = sub.subscribe(observer); 472 scope (exit) 473 d.dispose(); 474 475 assert(observer.hasNotBeenCalled); 476 477 sub.put(100); 478 479 assert(observer.hasNotBeenCalled); 480 481 assert(sub._hasValue); 482 assert(sub._value == 100); 483 484 sub.completed(); 485 486 assert(observer.putCount == 1); 487 assert(observer.completedCount == 1); 488 assert(observer.failureCount == 0); 489 assert(observer.lastValue == 100); 490 } 491 492 unittest 493 { 494 auto sub = new AsyncSubject!int; 495 auto observer = new CounterObserver!int; 496 497 sub.put(100); 498 499 assert(sub._hasValue); 500 assert(sub._value == 100); 501 502 auto d = sub.subscribe(observer); 503 scope (exit) 504 d.dispose(); 505 506 assert(observer.hasNotBeenCalled); 507 508 sub.completed(); 509 510 assert(observer.putCount == 1); 511 assert(observer.completedCount == 1); 512 assert(observer.failureCount == 0); 513 assert(observer.lastValue == 100); 514 } 515 516 unittest 517 { 518 auto sub = new AsyncSubject!int; 519 auto observer = new CounterObserver!int; 520 521 auto d = sub.subscribe(observer); 522 523 d.dispose(); 524 assert(observer.hasNotBeenCalled); 525 526 sub.put(100); 527 assert(observer.hasNotBeenCalled); 528 529 sub.completed(); 530 assert(observer.hasNotBeenCalled); 531 } 532 533 unittest 534 { 535 auto sub = new AsyncSubject!int; 536 auto observer = new CounterObserver!int; 537 538 auto d = sub.subscribe(observer); 539 assert(observer.hasNotBeenCalled); 540 541 sub.put(100); 542 assert(observer.hasNotBeenCalled); 543 544 d.dispose(); 545 assert(observer.hasNotBeenCalled); 546 547 sub.completed(); 548 assert(observer.hasNotBeenCalled); 549 } 550 551 unittest 552 { 553 554 auto sub = new AsyncSubject!int; 555 auto observer = new CounterObserver!int; 556 557 sub.put(100); 558 assert(observer.hasNotBeenCalled); 559 560 auto d = sub.subscribe(observer); 561 assert(observer.hasNotBeenCalled); 562 563 d.dispose(); 564 assert(observer.hasNotBeenCalled); 565 566 sub.completed(); 567 assert(observer.hasNotBeenCalled); 568 } 569 570 unittest 571 { 572 auto sub = new AsyncSubject!int; 573 auto observer = new CounterObserver!int; 574 575 auto d = sub.subscribe(observer); 576 scope (exit) 577 d.dispose(); 578 579 assert(observer.hasNotBeenCalled); 580 581 sub.completed(); 582 583 assert(observer.putCount == 0); 584 assert(observer.completedCount == 1); 585 assert(observer.failureCount == 0); 586 } 587 588 unittest 589 { 590 auto sub = new AsyncSubject!int; 591 auto observer = new CounterObserver!int; 592 593 auto d = sub.subscribe(observer); 594 scope (exit) 595 d.dispose(); 596 597 assert(observer.hasNotBeenCalled); 598 599 auto ex = new Exception("TEST"); 600 sub.failure(ex); 601 602 assert(observer.putCount == 0); 603 assert(observer.completedCount == 0); 604 assert(observer.failureCount == 1); 605 assert(observer.lastException is ex); 606 } 607 608 unittest 609 { 610 auto sub = new AsyncSubject!int; 611 auto ex = new Exception("TEST"); 612 sub.failure(ex); 613 614 auto observer = new CounterObserver!int; 615 616 auto d = sub.subscribe(observer); 617 scope (exit) 618 d.dispose(); 619 620 assert(observer.putCount == 0); 621 assert(observer.completedCount == 0); 622 assert(observer.failureCount == 1); 623 assert(observer.lastException is ex); 624 } 625 626 unittest 627 { 628 auto sub = new AsyncSubject!int; 629 auto observer = new CounterObserver!int; 630 631 sub.completed(); 632 assert(observer.hasNotBeenCalled); 633 634 sub.subscribe(observer); 635 assert(observer.putCount == 0); 636 assert(observer.completedCount == 1); 637 assert(observer.failureCount == 0); 638 } 639 640 version (unittest) 641 { 642 class CounterObserver(T) : Observer!T 643 { 644 public: 645 size_t putCount; 646 size_t completedCount; 647 size_t failureCount; 648 T lastValue; 649 Exception lastException; 650 651 public: 652 bool hasNotBeenCalled() const pure nothrow @nogc @safe @property 653 { 654 return putCount == 0 && completedCount == 0 && failureCount == 0; 655 } 656 657 public: 658 void put(T obj) 659 { 660 putCount++; 661 lastValue = obj; 662 } 663 664 void completed() 665 { 666 completedCount++; 667 } 668 669 void failure(Exception e) 670 { 671 failureCount++; 672 lastException = e; 673 } 674 } 675 } 676 677 /// 678 class BehaviorSubject(E) : Subject!E 679 { 680 public: 681 /// 682 this() 683 { 684 this(E.init); 685 } 686 687 /// 688 this(E value) 689 { 690 _subject = new SubjectObject!E; 691 _value = value; 692 } 693 694 public: 695 /// 696 inout(E) value() inout @property 697 { 698 return _value; 699 } 700 701 /// 702 void value(E value) @property 703 { 704 if (_value != value) 705 { 706 _value = value; 707 .put(_subject, value); 708 } 709 } 710 711 public: 712 /// 713 auto subscribe(TObserver)(auto ref TObserver observer) 714 { 715 .put(observer, value); 716 return _subject.doSubscribe(observer); 717 } 718 719 /// 720 Disposable subscribe(Observer!E observer) 721 { 722 .put(observer, value); 723 return disposableObject(_subject.doSubscribe(observer)); 724 } 725 726 /// 727 void put(E obj) 728 { 729 value = obj; 730 } 731 732 /// 733 void completed() 734 { 735 _subject.completed(); 736 } 737 738 /// 739 void failure(Exception e) 740 { 741 _subject.failure(e); 742 } 743 744 private: 745 SubjectObject!E _subject; 746 E _value; 747 } 748 749 unittest 750 { 751 static assert(isObservable!(BehaviorSubject!int, int)); 752 static assert(is(BehaviorSubject!int.ElementType == int)); 753 } 754 755 unittest 756 { 757 int num = 0; 758 auto subject = new BehaviorSubject!int(100); 759 760 auto d = subject.doSubscribe((int n) { num = n; }); 761 assert(num == 100); 762 763 .put(subject, 1); 764 assert(num == 1); 765 766 d.dispose(); 767 .put(subject, 10); 768 assert(num == 1); 769 }