1 /+++++++++++++++++++++++++++++ 2 + This module defines the Subject and some implements. 3 +/ 4 module rx.subject; 5 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 10 import core.atomic : atomicLoad, cas; 11 import std.range : put; 12 13 ///Represents an object that is both an observable sequence as well as an observer. 14 interface Subject(E) : Observer!E, Observable!E 15 { 16 } 17 18 ///Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers. 19 class SubjectObject(E) : Subject!E 20 { 21 alias ElementType = E; 22 public: 23 this() 24 { 25 _observer = cast(shared) NopObserver!E.instance; 26 } 27 28 public: 29 /// 30 void put(E obj) 31 { 32 auto temp = atomicLoad(_observer); 33 .put(temp, obj); 34 } 35 /// 36 void completed() 37 { 38 shared(Observer!E) oldObserver = void; 39 shared(Observer!E) newObserver = cast(shared) DoneObserver!E.instance; 40 Observer!E temp = void; 41 do 42 { 43 oldObserver = _observer; 44 temp = atomicLoad(oldObserver); 45 if (cast(DoneObserver!E) temp) 46 break; 47 } 48 while (!cas(&_observer, oldObserver, newObserver)); 49 temp.completed(); 50 } 51 /// 52 void failure(Exception error) 53 { 54 shared(Observer!E) oldObserver = void; 55 shared(Observer!E) newObserver = cast(shared) new DoneObserver!E(error); 56 Observer!E temp = void; 57 do 58 { 59 oldObserver = _observer; 60 temp = atomicLoad(oldObserver); 61 if (cast(DoneObserver!E) temp) 62 break; 63 } 64 while (!cas(&_observer, oldObserver, newObserver)); 65 temp.failure(error); 66 } 67 68 /// 69 Disposable subscribe(T)(T observer) 70 { 71 return subscribe(observerObject!E(observer)); 72 } 73 /// 74 Disposable subscribe(Observer!E observer) 75 { 76 shared(Observer!E) oldObserver = void; 77 shared(Observer!E) newObserver = void; 78 do 79 { 80 oldObserver = _observer; 81 auto temp = atomicLoad(oldObserver); 82 83 if (temp is DoneObserver!E.instance) 84 { 85 observer.completed(); 86 return NopDisposable.instance; 87 } 88 89 if (auto fail = cast(DoneObserver!E) temp) 90 { 91 observer.failure(fail.exception); 92 return NopDisposable.instance; 93 } 94 95 if (auto composite = cast(CompositeObserver!E) temp) 96 { 97 newObserver = cast(shared) composite.add(observer); 98 } 99 else if (auto nop = cast(NopObserver!E) temp) 100 { 101 newObserver = cast(shared) observer; 102 } 103 else 104 { 105 newObserver = cast(shared)(new CompositeObserver!E([temp, observer])); 106 } 107 } 108 while (!cas(&_observer, oldObserver, newObserver)); 109 110 return subscription(this, observer); 111 } 112 113 /// 114 void unsubscribe(Observer!E observer) 115 { 116 shared(Observer!E) oldObserver = void; 117 shared(Observer!E) newObserver = void; 118 do 119 { 120 oldObserver = _observer; 121 122 auto temp = atomicLoad(oldObserver); 123 if (auto composite = cast(CompositeObserver!E) temp) 124 { 125 newObserver = cast(shared) composite.remove(observer); 126 } 127 else 128 { 129 if (temp !is observer) 130 return; 131 132 newObserver = cast(shared) NopObserver!E.instance; 133 } 134 } 135 while (!cas(&_observer, oldObserver, newObserver)); 136 } 137 138 private: 139 shared(Observer!E) _observer; 140 } 141 142 /// 143 unittest 144 { 145 import std.array : appender; 146 147 auto data = appender!(int[])(); 148 auto subject = new SubjectObject!int; 149 auto disposable = subject.subscribe(observerObject!(int)(data)); 150 assert(disposable !is null); 151 subject.put(0); 152 subject.put(1); 153 154 import std.algorithm : equal; 155 156 assert(equal(data.data, [0, 1])); 157 158 disposable.dispose(); 159 subject.put(2); 160 assert(equal(data.data, [0, 1])); 161 } 162 163 unittest 164 { 165 static assert(isObserver!(SubjectObject!int, int)); 166 static assert(isObservable!(SubjectObject!int, int)); 167 static assert(!isObservable!(SubjectObject!int, string)); 168 static assert(!isObservable!(SubjectObject!int, string)); 169 } 170 171 unittest 172 { 173 auto subject = new SubjectObject!int; 174 auto observer = new CounterObserver!int; 175 auto disposable = subject.subscribe(observer); 176 scope (exit) 177 disposable.dispose(); 178 179 subject.put(0); 180 subject.put(1); 181 182 assert(observer.putCount == 2); 183 subject.completed(); 184 subject.put(2); 185 assert(observer.putCount == 2); 186 assert(observer.completedCount == 1); 187 } 188 189 unittest 190 { 191 auto subject = new SubjectObject!int; 192 auto observer = new CounterObserver!int; 193 auto disposable = subject.subscribe(observer); 194 scope (exit) 195 disposable.dispose(); 196 197 subject.put(0); 198 subject.put(1); 199 200 assert(observer.putCount == 2); 201 auto ex = new Exception("Exception"); 202 subject.failure(ex); 203 subject.put(2); 204 assert(observer.putCount == 2); 205 assert(observer.failureCount == 1); 206 assert(observer.lastException is ex); 207 } 208 209 unittest 210 { 211 import std.array : appender; 212 213 auto buf1 = appender!(int[]); 214 auto buf2 = appender!(int[]); 215 auto subject = new SubjectObject!int; 216 subject.subscribe(observerObject!(int)(buf1)); 217 subject.doSubscribe((int n) => buf2.put(n)); 218 219 assert(buf1.data.length == 0); 220 assert(buf2.data.length == 0); 221 subject.put(0); 222 assert(buf1.data.length == 1); 223 assert(buf2.data.length == 1); 224 assert(buf1.data[0] == buf2.data[0]); 225 } 226 227 unittest 228 { 229 auto sub = new SubjectObject!int; 230 sub.completed(); 231 232 auto observer = new CounterObserver!int; 233 assert(observer.putCount == 0); 234 assert(observer.completedCount == 0); 235 assert(observer.failureCount == 0); 236 sub.subscribe(observer); 237 assert(observer.putCount == 0); 238 assert(observer.completedCount == 1); 239 assert(observer.failureCount == 0); 240 } 241 242 unittest 243 { 244 auto sub = new SubjectObject!int; 245 auto ex = new Exception("Exception"); 246 sub.failure(ex); 247 248 auto observer = new CounterObserver!int; 249 assert(observer.putCount == 0); 250 assert(observer.completedCount == 0); 251 assert(observer.failureCount == 0); 252 sub.subscribe(observer); 253 assert(observer.putCount == 0); 254 assert(observer.completedCount == 0); 255 assert(observer.failureCount == 1); 256 assert(observer.lastException is ex); 257 } 258 259 private class Subscription(TSubject, TObserver) : Disposable 260 { 261 public: 262 this(TSubject subject, TObserver observer) 263 { 264 _subject = subject; 265 _observer = observer; 266 } 267 268 public: 269 void dispose() 270 { 271 if (_subject !is null) 272 { 273 _subject.unsubscribe(_observer); 274 _subject = null; 275 } 276 } 277 278 private: 279 TSubject _subject; 280 TObserver _observer; 281 } 282 283 private Subscription!(TSubject, TObserver) subscription(TSubject, TObserver)( 284 TSubject subject, TObserver observer) 285 { 286 return new typeof(return)(subject, observer); 287 } 288 289 /// 290 class AsyncSubject(E) : Subject!E 291 { 292 public: 293 /// 294 Disposable subscribe(Observer!E observer) 295 { 296 Exception ex = null; 297 E value; 298 bool hasValue = false; 299 300 synchronized (this) 301 { 302 if (!_isStopped) 303 { 304 _observers ~= observer; 305 return subscription(this, observer); 306 } 307 308 ex = _exception; 309 hasValue = _hasValue; 310 value = _value; 311 } 312 313 if (ex !is null) 314 { 315 observer.failure(ex); 316 } 317 else if (hasValue) 318 { 319 .put(observer, value); 320 observer.completed(); 321 } 322 else 323 { 324 observer.completed(); 325 } 326 327 return NopDisposable.instance; 328 } 329 330 /// 331 auto subscribe(T)(T observer) 332 { 333 return subscribe(observerObject!E(observer)); 334 } 335 336 /// 337 void unsubscribe(Observer!E observer) 338 { 339 if (observer is null) 340 return; 341 342 synchronized (this) 343 { 344 import std.algorithm : remove, countUntil; 345 346 auto index = countUntil(_observers, observer); 347 if (index != -1) 348 { 349 _observers = remove(_observers, index); 350 } 351 } 352 } 353 354 public: 355 /// 356 void put(E value) 357 { 358 synchronized (this) 359 { 360 if (!_isStopped) 361 { 362 _value = value; 363 _hasValue = true; 364 } 365 } 366 } 367 368 /// 369 void completed() 370 { 371 Observer!E[] os = null; 372 373 E value; 374 bool hasValue = false; 375 376 synchronized (this) 377 { 378 if (!_isStopped) 379 { 380 os = _observers; 381 _observers.length = 0; 382 _isStopped = true; 383 value = _value; 384 hasValue = _hasValue; 385 } 386 } 387 388 if (os) 389 { 390 if (hasValue) 391 { 392 foreach (observer; os) 393 { 394 .put(observer, value); 395 observer.completed(); 396 } 397 } 398 else 399 { 400 foreach (observer; os) 401 { 402 observer.completed(); 403 } 404 } 405 } 406 } 407 408 /// 409 void failure(Exception e) 410 { 411 assert(e !is null); 412 413 Observer!E[] os = null; 414 synchronized (this) 415 { 416 if (!_isStopped) 417 { 418 os = _observers; 419 _observers.length = 0; 420 _isStopped = true; 421 _exception = e; 422 } 423 } 424 425 if (os) 426 { 427 foreach (observer; os) 428 { 429 observer.failure(e); 430 } 431 } 432 } 433 434 private: 435 Observer!E[] _observers; 436 bool _isStopped; 437 E _value; 438 bool _hasValue; 439 Exception _exception; 440 } 441 442 unittest 443 { 444 auto sub = new AsyncSubject!int; 445 446 .put(sub, 1); 447 sub.completed(); 448 449 auto observer = new CounterObserver!int; 450 451 assert(observer.hasNotBeenCalled); 452 453 sub.subscribe(observer); 454 455 assert(observer.putCount == 1); 456 assert(observer.completedCount == 1); 457 assert(observer.failureCount == 0); 458 assert(observer.lastValue == 1); 459 } 460 461 unittest 462 { 463 auto sub = new AsyncSubject!int; 464 auto observer = new CounterObserver!int; 465 466 auto d = sub.subscribe(observer); 467 scope (exit) 468 d.dispose(); 469 470 assert(observer.hasNotBeenCalled); 471 472 sub.put(100); 473 474 assert(observer.hasNotBeenCalled); 475 476 assert(sub._hasValue); 477 assert(sub._value == 100); 478 479 sub.completed(); 480 481 assert(observer.putCount == 1); 482 assert(observer.completedCount == 1); 483 assert(observer.failureCount == 0); 484 assert(observer.lastValue == 100); 485 } 486 487 unittest 488 { 489 auto sub = new AsyncSubject!int; 490 auto observer = new CounterObserver!int; 491 492 sub.put(100); 493 494 assert(sub._hasValue); 495 assert(sub._value == 100); 496 497 auto d = sub.subscribe(observer); 498 scope (exit) 499 d.dispose(); 500 501 assert(observer.hasNotBeenCalled); 502 503 sub.completed(); 504 505 assert(observer.putCount == 1); 506 assert(observer.completedCount == 1); 507 assert(observer.failureCount == 0); 508 assert(observer.lastValue == 100); 509 } 510 511 unittest 512 { 513 auto sub = new AsyncSubject!int; 514 auto observer = new CounterObserver!int; 515 516 auto d = sub.subscribe(observer); 517 518 d.dispose(); 519 assert(observer.hasNotBeenCalled); 520 521 sub.put(100); 522 assert(observer.hasNotBeenCalled); 523 524 sub.completed(); 525 assert(observer.hasNotBeenCalled); 526 } 527 528 unittest 529 { 530 auto sub = new AsyncSubject!int; 531 auto observer = new CounterObserver!int; 532 533 auto d = sub.subscribe(observer); 534 assert(observer.hasNotBeenCalled); 535 536 sub.put(100); 537 assert(observer.hasNotBeenCalled); 538 539 d.dispose(); 540 assert(observer.hasNotBeenCalled); 541 542 sub.completed(); 543 assert(observer.hasNotBeenCalled); 544 } 545 546 unittest 547 { 548 549 auto sub = new AsyncSubject!int; 550 auto observer = new CounterObserver!int; 551 552 sub.put(100); 553 assert(observer.hasNotBeenCalled); 554 555 auto d = sub.subscribe(observer); 556 assert(observer.hasNotBeenCalled); 557 558 d.dispose(); 559 assert(observer.hasNotBeenCalled); 560 561 sub.completed(); 562 assert(observer.hasNotBeenCalled); 563 } 564 565 unittest 566 { 567 auto sub = new AsyncSubject!int; 568 auto observer = new CounterObserver!int; 569 570 auto d = sub.subscribe(observer); 571 scope (exit) 572 d.dispose(); 573 574 assert(observer.hasNotBeenCalled); 575 576 sub.completed(); 577 578 assert(observer.putCount == 0); 579 assert(observer.completedCount == 1); 580 assert(observer.failureCount == 0); 581 } 582 583 unittest 584 { 585 auto sub = new AsyncSubject!int; 586 auto observer = new CounterObserver!int; 587 588 auto d = sub.subscribe(observer); 589 scope (exit) 590 d.dispose(); 591 592 assert(observer.hasNotBeenCalled); 593 594 auto ex = new Exception("TEST"); 595 sub.failure(ex); 596 597 assert(observer.putCount == 0); 598 assert(observer.completedCount == 0); 599 assert(observer.failureCount == 1); 600 assert(observer.lastException is ex); 601 } 602 603 unittest 604 { 605 auto sub = new AsyncSubject!int; 606 auto ex = new Exception("TEST"); 607 sub.failure(ex); 608 609 auto observer = new CounterObserver!int; 610 611 auto d = sub.subscribe(observer); 612 scope (exit) 613 d.dispose(); 614 615 assert(observer.putCount == 0); 616 assert(observer.completedCount == 0); 617 assert(observer.failureCount == 1); 618 assert(observer.lastException is ex); 619 } 620 621 unittest 622 { 623 auto sub = new AsyncSubject!int; 624 auto observer = new CounterObserver!int; 625 626 sub.completed(); 627 assert(observer.hasNotBeenCalled); 628 629 sub.subscribe(observer); 630 assert(observer.putCount == 0); 631 assert(observer.completedCount == 1); 632 assert(observer.failureCount == 0); 633 } 634 635 version (unittest) 636 { 637 class CounterObserver(T) : Observer!T 638 { 639 public: 640 size_t putCount; 641 size_t completedCount; 642 size_t failureCount; 643 T lastValue; 644 Exception lastException; 645 646 public: 647 bool hasNotBeenCalled() const pure nothrow @nogc @safe @property 648 { 649 return putCount == 0 && completedCount == 0 && failureCount == 0; 650 } 651 652 public: 653 void put(T obj) 654 { 655 putCount++; 656 lastValue = obj; 657 } 658 659 void completed() 660 { 661 completedCount++; 662 } 663 664 void failure(Exception e) 665 { 666 failureCount++; 667 lastException = e; 668 } 669 } 670 }