1 /++ 2 + This module defines the concept of Scheduler. 3 +/ 4 module rx.scheduler; 5 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 10 import core.time; 11 import core.thread : Thread; 12 import std.range : put; 13 import std.parallelism : TaskPool, taskPool, task; 14 15 //Note: 16 // In single core, taskPool's worker are not initialized. 17 // Some asynchronous algorithm does not work as expected, so at least 1 thread is reserved. 18 version (Disable_ReservePoolThreadsOnSingleCore) 19 { 20 } 21 else 22 { 23 shared static this() 24 { 25 import std.parallelism : defaultPoolThreads; 26 27 if (defaultPoolThreads == 0) 28 defaultPoolThreads = 1; 29 } 30 } 31 32 /// 33 interface Scheduler 34 { 35 void start(void delegate() op); 36 } 37 /// 38 interface AsyncScheduler : Scheduler 39 { 40 CancellationToken schedule(void delegate() op, Duration val); 41 } 42 43 /// 44 class LocalScheduler : Scheduler 45 { 46 public: 47 void start(void delegate() op) 48 { 49 op(); 50 } 51 } 52 /// 53 class ThreadScheduler : AsyncScheduler 54 { 55 void start(void delegate() op) 56 { 57 auto t = new Thread(op); 58 t.start(); 59 } 60 61 CancellationToken schedule(void delegate() op, Duration val) 62 { 63 auto target = MonoTime.currTime + val; 64 auto c = new CancellationToken; 65 start({ 66 if (c.isCanceled) 67 return; 68 auto dt = target - MonoTime.currTime; 69 if (dt > Duration.zero) 70 Thread.sleep(dt); 71 if (!c.isCanceled) 72 op(); 73 }); 74 return c; 75 } 76 } 77 78 unittest 79 { 80 import std.stdio : writeln; 81 82 writeln("Testing ThreadScheduler..."); 83 scope (exit) 84 writeln("ThreadScheduler test is completed."); 85 86 import rx.util : EventSignal; 87 88 auto scheduler = new ThreadScheduler; 89 auto signal = new EventSignal; 90 auto done = false; 91 auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs); 92 93 signal.wait(); 94 assert(done); 95 assert(!c.isCanceled); 96 } 97 /// 98 class TaskPoolScheduler : AsyncScheduler 99 { 100 public: 101 this(TaskPool pool = null) 102 { 103 if (pool is null) 104 pool = taskPool; 105 106 _pool = pool; 107 } 108 109 public: 110 void start(void delegate() op) 111 { 112 _pool.put(task(op)); 113 } 114 115 CancellationToken schedule(void delegate() op, Duration val) 116 { 117 auto target = MonoTime.currTime + val; 118 auto c = new CancellationToken; 119 start({ 120 if (c.isCanceled) 121 return; 122 auto dt = target - MonoTime.currTime; 123 if (dt > Duration.zero) 124 Thread.sleep(dt); 125 if (!c.isCanceled) 126 op(); 127 }); 128 return c; 129 } 130 131 private: 132 TaskPool _pool; 133 } 134 135 unittest 136 { 137 import std.stdio : writeln; 138 import std.parallelism : totalCPUs, defaultPoolThreads; 139 140 writeln("Testing TaskPoolScheduler..."); 141 scope (exit) 142 writeln("TaskPoolScheduler test is completed."); 143 144 version (OSX) 145 { 146 writeln("totalCPUs: ", totalCPUs); 147 writeln("defaultPoolThreads: ", defaultPoolThreads); 148 } 149 150 import rx.util : EventSignal; 151 152 auto scheduler = new TaskPoolScheduler; 153 auto signal = new EventSignal; 154 auto done = false; 155 auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs); 156 157 signal.wait(); 158 assert(done); 159 assert(!c.isCanceled); 160 } 161 162 /// 163 class HistoricalScheduler(T) : AsyncScheduler 164 { 165 static assert(is(T : AsyncScheduler)); 166 167 public: 168 this(T innerScheduler) 169 { 170 _offset = Duration.zero; 171 _innerScheduler = innerScheduler; 172 } 173 174 public: 175 void start(void delegate() op) 176 { 177 _innerScheduler.start(op); 178 } 179 180 CancellationToken schedule(void delegate() op, Duration val) 181 { 182 return _innerScheduler.schedule(op, val - _offset); 183 } 184 185 void roll(Duration val) 186 { 187 _offset += val; 188 } 189 190 private: 191 T _innerScheduler; 192 Duration _offset; 193 } 194 /// 195 HistoricalScheduler!TScheduler historicalScheduler(TScheduler)(auto ref TScheduler scheduler) 196 { 197 return new typeof(return)(scheduler); 198 } 199 200 unittest 201 { 202 import std.stdio : writeln; 203 204 writeln("Testing HistoricalScheduler..."); 205 scope (exit) 206 writeln("HistoricalScheduler test is completed."); 207 208 void test(AsyncScheduler scheduler) 209 { 210 import rx.util : EventSignal; 211 212 bool done = false; 213 auto signal = new EventSignal; 214 215 auto c = scheduler.schedule(() { done = true; signal.setSignal(); }, dur!"msecs"(100)); 216 assert(!done); 217 218 signal.wait(); 219 assert(done); 220 assert(!c.isCanceled); 221 } 222 223 //test(new ThreadScheduler); 224 //test(new TaskPoolScheduler); 225 test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler)); 226 test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler)); 227 } 228 229 unittest 230 { 231 void test(AsyncScheduler scheduler) 232 { 233 bool done = false; 234 auto c = scheduler.schedule(() { done = true; }, dur!"msecs"(50)); 235 c.cancel(); 236 Thread.sleep(dur!"msecs"(100)); 237 assert(!done); 238 } 239 240 test(new ThreadScheduler); 241 test(new TaskPoolScheduler); 242 test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler)); 243 test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler)); 244 } 245 246 unittest 247 { 248 import std.typetuple : TypeTuple; 249 250 foreach (T; TypeTuple!(ThreadScheduler, TaskPoolScheduler)) 251 { 252 auto scheduler = historicalScheduler(new T); 253 254 scheduler.roll(dur!"seconds"(20)); 255 256 auto done = false; 257 auto c = scheduler.schedule(() { done = true; }, dur!"seconds"(10)); 258 Thread.sleep(dur!"msecs"(10)); // wait for a context switch 259 assert(done); 260 } 261 } 262 263 unittest 264 { 265 static assert(!__traits(compiles, { HistoricalScheduler!LocalScheduler s; })); 266 } 267 268 /// 269 struct ObserveOnObserver(TObserver, TScheduler, E) 270 { 271 public: 272 static if (hasFailure!TObserver) 273 { 274 this(TObserver observer, TScheduler scheduler, Disposable disposable) 275 { 276 _observer = observer; 277 _scheduler = scheduler; 278 _disposable = disposable; 279 } 280 } 281 else 282 { 283 this(TObserver observer, TScheduler scheduler) 284 { 285 _observer = observer; 286 _scheduler = scheduler; 287 } 288 } 289 public: 290 void put(E obj) 291 { 292 _scheduler.start({ 293 static if (hasFailure!TObserver) 294 { 295 try 296 { 297 _observer.put(obj); 298 } 299 catch (Exception e) 300 { 301 _observer.failure(e); 302 _disposable.dispose(); 303 } 304 } 305 else 306 { 307 _observer.put(obj); 308 } 309 }); 310 } 311 312 static if (hasCompleted!TObserver) 313 { 314 void completed() 315 { 316 _scheduler.start({ _observer.completed(); }); 317 } 318 } 319 static if (hasFailure!TObserver) 320 { 321 void failure(Exception e) 322 { 323 _scheduler.start({ _observer.failure(e); }); 324 } 325 } 326 private: 327 TObserver _observer; 328 TScheduler _scheduler; 329 static if (hasFailure!TObserver) 330 { 331 Disposable _disposable; 332 } 333 } 334 335 /// 336 struct ObserveOnObservable(TObservable, TScheduler : Scheduler) 337 { 338 alias ElementType = TObservable.ElementType; 339 public: 340 this(TObservable observable, TScheduler scheduler) 341 { 342 _observable = observable; 343 _scheduler = scheduler; 344 } 345 346 public: 347 auto subscribe(TObserver)(TObserver observer) 348 { 349 alias ObserverType = ObserveOnObserver!(TObserver, TScheduler, TObservable.ElementType); 350 static if (hasFailure!TObserver) 351 { 352 auto disposable = new SingleAssignmentDisposable; 353 disposable.setDisposable(disposableObject(doSubscribe(_observable, 354 ObserverType(observer, _scheduler, disposable)))); 355 return disposable; 356 } 357 else 358 { 359 return doSubscribe(_observable, ObserverType(observer, _scheduler)); 360 } 361 } 362 363 private: 364 TObservable _observable; 365 TScheduler _scheduler; 366 } 367 368 unittest 369 { 370 alias TestObservable = ObserveOnObservable!(Observable!int, Scheduler); 371 static assert(isObservable!(TestObservable, int)); 372 373 import rx.subject : SubjectObject; 374 375 auto sub = new SubjectObject!int; 376 auto scheduler = new LocalScheduler; 377 378 auto scheduled = TestObservable(sub, scheduler); 379 380 auto flag1 = false; 381 auto d = scheduled.subscribe((int n) { flag1 = true; }); 382 scope (exit) 383 d.dispose(); 384 .put(sub, 1); 385 assert(flag1); 386 387 auto flag2 = false; 388 auto d2 = scheduled.doSubscribe((int n) { flag2 = true; }); 389 scope (exit) 390 d2.dispose(); 391 .put(sub, 2); 392 assert(flag2); 393 } 394 395 /// 396 ObserveOnObservable!(TObservable, TScheduler) observeOn(TObservable, TScheduler : Scheduler)( 397 auto ref TObservable observable, TScheduler scheduler) 398 { 399 return typeof(return)(observable, scheduler); 400 } 401 402 /// 403 unittest 404 { 405 import std.concurrency; 406 import rx.subject; 407 408 auto subject = new SubjectObject!int; 409 auto scheduler = new LocalScheduler; 410 auto scheduled = subject.observeOn(scheduler); 411 412 import std.array : appender; 413 414 auto buf = appender!(int[]); 415 auto observer = observerObject!int(buf); 416 417 auto d1 = scheduled.subscribe(buf); 418 auto d2 = scheduled.subscribe(observer); 419 420 subject.put(0); 421 assert(buf.data.length == 2); 422 423 subject.put(1); 424 assert(buf.data.length == 4); 425 } 426 427 unittest 428 { 429 import std.concurrency; 430 import rx.subject; 431 432 auto subject = new SubjectObject!int; 433 auto scheduler = new LocalScheduler; 434 auto scheduled = subject.observeOn(scheduler); 435 436 struct ObserverA 437 { 438 void put(int n) 439 { 440 } 441 } 442 443 struct ObserverB 444 { 445 void put(int n) 446 { 447 } 448 449 void completed() 450 { 451 } 452 } 453 454 struct ObserverC 455 { 456 void put(int n) 457 { 458 } 459 460 void failure(Exception e) 461 { 462 } 463 } 464 465 struct ObserverD 466 { 467 void put(int n) 468 { 469 } 470 471 void completed() 472 { 473 } 474 475 void failure(Exception e) 476 { 477 } 478 } 479 480 scheduled.doSubscribe(ObserverA()); 481 scheduled.doSubscribe(ObserverB()); 482 scheduled.doSubscribe(ObserverC()); 483 scheduled.doSubscribe(ObserverD()); 484 485 subject.put(1); 486 subject.completed(); 487 } 488 489 /// 490 class SubscribeOnObservable(TObservable, TScheduler : Scheduler) 491 { 492 alias ElementType = TObservable.ElementType; 493 494 public: 495 this(TObservable observable, TScheduler scheduler) 496 { 497 _observable = observable; 498 _scheduler = scheduler; 499 } 500 501 public: 502 auto subscribe(TObserver)(TObserver observer) 503 { 504 auto disposable = new SingleAssignmentDisposable; 505 _scheduler.start({ 506 auto temp = doSubscribe(_observable, observer); 507 disposable.setDisposable(disposableObject(temp)); 508 }); 509 return disposable; 510 } 511 512 private: 513 TObservable _observable; 514 TScheduler _scheduler; 515 } 516 517 unittest 518 { 519 alias TestObservable = SubscribeOnObservable!(Observable!int, Scheduler); 520 static assert(isObservable!(TestObservable, int)); 521 522 import rx.subject : SubjectObject; 523 524 auto sub = new SubjectObject!int; 525 auto scheduler = new LocalScheduler; 526 527 auto scheduled = new TestObservable(sub, scheduler); 528 529 auto flag1 = false; 530 auto d = scheduled.subscribe((int n) { flag1 = true; }); 531 scope (exit) 532 d.dispose(); 533 .put(sub, 1); 534 assert(flag1); 535 536 auto flag2 = false; 537 auto d2 = scheduled.doSubscribe((int n) { flag2 = true; }); 538 scope (exit) 539 d2.dispose(); 540 .put(sub, 2); 541 assert(flag2); 542 } 543 544 /// 545 SubscribeOnObservable!(TObservable, TScheduler) subscribeOn(TObservable, TScheduler : Scheduler)( 546 auto ref TObservable observable, auto ref TScheduler scheduler) 547 { 548 return new typeof(return)(observable, scheduler); 549 } 550 /// 551 unittest 552 { 553 import rx.observable : defer; 554 555 auto sub = defer!int((Observer!int observer) { 556 .put(observer, 100); 557 return NopDisposable.instance; 558 }); 559 auto scheduler = new LocalScheduler; 560 561 auto scheduled = sub.subscribeOn(scheduler); 562 563 int value = 0; 564 auto d = scheduled.doSubscribe((int n) { value = n; }); 565 scope (exit) 566 d.dispose(); 567 568 assert(value == 100); 569 } 570 /// 571 unittest 572 { 573 import rx.observable : defer; 574 import rx.util : EventSignal; 575 576 auto sub = defer!int((Observer!int observer) { 577 .put(observer, 100); 578 return NopDisposable.instance; 579 }); 580 auto scheduler = new TaskPoolScheduler; 581 auto scheduled = sub.subscribeOn(scheduler); 582 583 int value = 0; 584 auto signal = new EventSignal; 585 auto d = scheduled.subscribe((int n) { value = n; signal.setSignal(); }); 586 scope (exit) 587 d.dispose(); 588 589 signal.wait(); 590 assert(value == 100); 591 } 592 593 unittest 594 { 595 import std.algorithm : equal; 596 import std.array : Appender; 597 import rx.util : EventSignal; 598 599 auto buf = Appender!(int[])(); 600 auto data = [1, 2, 3, 4]; 601 602 auto event = new EventSignal; 603 auto observer = (int n) { 604 buf.put(n); 605 if (n == 4) 606 event.setSignal(); 607 }; 608 data.asObservable().subscribeOn(new ThreadScheduler).subscribe(observer); 609 610 event.wait(); 611 612 assert(equal(buf.data, data)); 613 } 614 615 unittest 616 { 617 import std.algorithm : equal; 618 import std.array : Appender; 619 import rx.util : EventSignal; 620 621 auto buf = Appender!(int[])(); 622 auto data = [1, 2, 3, 4]; 623 624 auto event = new EventSignal; 625 auto observer = (int n) { 626 buf.put(n); 627 if (n == 4) 628 event.setSignal(); 629 }; 630 data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe(observer); 631 632 event.wait(); 633 634 assert(equal(buf.data, data)); 635 } 636 637 unittest 638 { 639 import rx.util : EventSignal; 640 641 auto data = [1, 2, 3, 4]; 642 auto event = new EventSignal(); 643 644 data.asObservable().subscribeOn(new ThreadScheduler).subscribe((int a) { 645 if (a == 4) 646 event.setSignal(); 647 }); 648 649 event.wait(); 650 } 651 652 unittest 653 { 654 import rx.util : EventSignal; 655 656 auto data = [1, 2, 3, 4]; 657 auto event = new EventSignal(); 658 659 data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe((int a) { 660 if (a == 4) 661 event.setSignal(); 662 }); 663 664 event.wait(); 665 } 666 667 unittest 668 { 669 import core.atomic; 670 import core.sync.condition; 671 import std.typetuple; 672 import rx.util : EventSignal; 673 674 enum N = 4; 675 676 void test(Scheduler scheduler) 677 { 678 auto signal = new EventSignal; 679 shared count = 0; 680 foreach (n; 0 .. N) 681 { 682 scheduler.start(() { 683 atomicOp!"+="(count, 1); 684 Thread.sleep(dur!"msecs"(50)); 685 if (atomicLoad(count) == N) 686 signal.setSignal(); 687 }); 688 } 689 signal.wait(); 690 assert(count == N); 691 } 692 693 test(new LocalScheduler); 694 test(new ThreadScheduler); 695 test(new TaskPoolScheduler); 696 test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler)); 697 test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler)); 698 } 699 700 private __gshared Scheduler s_scheduler; 701 shared static this() 702 { 703 s_scheduler = new TaskPoolScheduler; 704 } 705 706 /// 707 Scheduler currentScheduler() @property 708 { 709 return s_scheduler; 710 } 711 712 /// 713 TScheduler currentScheduler(TScheduler : Scheduler)(TScheduler scheduler) @property 714 { 715 s_scheduler = scheduler; 716 return scheduler; 717 } 718 719 unittest 720 { 721 Scheduler s = currentScheduler; 722 scope (exit) 723 currentScheduler = s; 724 725 TaskPoolScheduler s1 = new TaskPoolScheduler; 726 TaskPoolScheduler s2 = currentScheduler = s1; 727 assert(s2 is s1); 728 }