1 module rx.scheduler; 2 3 import rx.disposable; 4 import rx.observer; 5 import rx.observable; 6 7 import core.time; 8 import core.thread : Thread; 9 import std.range : put; 10 import std.parallelism : TaskPool, taskPool, task; 11 12 //Note: 13 // In single core, taskPool's worker are not initialized. 14 // Some asynchronous algorithm does not work as expected, so at least 1 thread is reserved. 15 version (Disable_ReservePoolThreadsOnSingleCore) 16 { 17 } 18 else 19 { 20 shared static this() 21 { 22 import std.parallelism : defaultPoolThreads; 23 24 if (defaultPoolThreads == 0) 25 defaultPoolThreads = 1; 26 } 27 } 28 29 /// 30 interface Scheduler 31 { 32 void start(void delegate() op); 33 } 34 /// 35 interface AsyncScheduler : Scheduler 36 { 37 CancellationToken schedule(void delegate() op, Duration val); 38 } 39 40 /// 41 class LocalScheduler : Scheduler 42 { 43 public: 44 void start(void delegate() op) 45 { 46 op(); 47 } 48 } 49 /// 50 class ThreadScheduler : AsyncScheduler 51 { 52 void start(void delegate() op) 53 { 54 auto t = new Thread(op); 55 t.start(); 56 } 57 58 CancellationToken schedule(void delegate() op, Duration val) 59 { 60 auto target = MonoTime.currTime + val; 61 auto c = new CancellationToken; 62 start({ 63 if (c.isCanceled) 64 return; 65 auto dt = target - MonoTime.currTime; 66 if (dt > Duration.zero) 67 Thread.sleep(dt); 68 if (!c.isCanceled) 69 op(); 70 }); 71 return c; 72 } 73 } 74 75 unittest 76 { 77 import std.stdio : writeln; 78 79 writeln("Testing ThreadScheduler..."); 80 scope (exit) 81 writeln("ThreadScheduler test is completed."); 82 83 import rx.util : EventSignal; 84 85 auto scheduler = new ThreadScheduler; 86 auto signal = new EventSignal; 87 auto done = false; 88 auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs); 89 90 signal.wait(); 91 assert(done); 92 assert(!c.isCanceled); 93 } 94 /// 95 class TaskPoolScheduler : AsyncScheduler 96 { 97 public: 98 this(TaskPool pool = null) 99 { 100 if (pool is null) 101 pool = taskPool; 102 103 _pool = pool; 104 } 105 106 public: 107 void start(void delegate() op) 108 { 109 _pool.put(task(op)); 110 } 111 112 CancellationToken schedule(void delegate() op, Duration val) 113 { 114 auto target = MonoTime.currTime + val; 115 auto c = new CancellationToken; 116 start({ 117 if (c.isCanceled) 118 return; 119 auto dt = target - MonoTime.currTime; 120 if (dt > Duration.zero) 121 Thread.sleep(dt); 122 if (!c.isCanceled) 123 op(); 124 }); 125 return c; 126 } 127 128 private: 129 TaskPool _pool; 130 } 131 132 unittest 133 { 134 import std.stdio : writeln; 135 import std.parallelism : totalCPUs, defaultPoolThreads; 136 137 writeln("Testing TaskPoolScheduler..."); 138 scope (exit) 139 writeln("TaskPoolScheduler test is completed."); 140 141 version (OSX) 142 { 143 writeln("totalCPUs: ", totalCPUs); 144 writeln("defaultPoolThreads: ", defaultPoolThreads); 145 } 146 147 import rx.util : EventSignal; 148 149 auto scheduler = new TaskPoolScheduler; 150 auto signal = new EventSignal; 151 auto done = false; 152 auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs); 153 154 signal.wait(); 155 assert(done); 156 assert(!c.isCanceled); 157 } 158 159 /// 160 class HistoricalScheduler(T) : AsyncScheduler 161 { 162 static assert(is(T : AsyncScheduler)); 163 164 public: 165 this(T innerScheduler) 166 { 167 _offset = Duration.zero; 168 _innerScheduler = innerScheduler; 169 } 170 171 public: 172 void start(void delegate() op) 173 { 174 _innerScheduler.start(op); 175 } 176 177 CancellationToken schedule(void delegate() op, Duration val) 178 { 179 return _innerScheduler.schedule(op, val - _offset); 180 } 181 182 void roll(Duration val) 183 { 184 _offset += val; 185 } 186 187 private: 188 T _innerScheduler; 189 Duration _offset; 190 } 191 /// 192 HistoricalScheduler!TScheduler historicalScheduler(TScheduler)(auto ref TScheduler scheduler) 193 { 194 return new typeof(return)(scheduler); 195 } 196 197 unittest 198 { 199 import std.stdio : writeln; 200 201 writeln("Testing HistoricalScheduler..."); 202 scope (exit) 203 writeln("HistoricalScheduler test is completed."); 204 205 void test(AsyncScheduler scheduler) 206 { 207 import rx.util : EventSignal; 208 209 bool done = false; 210 auto signal = new EventSignal; 211 212 auto c = scheduler.schedule(() { done = true; signal.setSignal(); }, dur!"msecs"(100)); 213 assert(!done); 214 215 signal.wait(); 216 assert(done); 217 assert(!c.isCanceled); 218 } 219 220 //test(new ThreadScheduler); 221 //test(new TaskPoolScheduler); 222 test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler)); 223 test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler)); 224 } 225 226 unittest 227 { 228 void test(AsyncScheduler scheduler) 229 { 230 bool done = false; 231 auto c = scheduler.schedule(() { done = true; }, dur!"msecs"(50)); 232 c.cancel(); 233 Thread.sleep(dur!"msecs"(100)); 234 assert(!done); 235 } 236 237 test(new ThreadScheduler); 238 test(new TaskPoolScheduler); 239 test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler)); 240 test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler)); 241 } 242 243 unittest 244 { 245 import std.typetuple : TypeTuple; 246 247 foreach (T; TypeTuple!(ThreadScheduler, TaskPoolScheduler)) 248 { 249 auto scheduler = historicalScheduler(new T); 250 251 scheduler.roll(dur!"seconds"(20)); 252 253 auto done = false; 254 auto c = scheduler.schedule(() { done = true; }, dur!"seconds"(10)); 255 Thread.sleep(dur!"msecs"(10)); // wait for a context switch 256 assert(done); 257 } 258 } 259 260 unittest 261 { 262 static assert(!__traits(compiles, { HistoricalScheduler!LocalScheduler s; })); 263 } 264 265 struct ObserveOnObserver(TObserver, TScheduler, E) 266 { 267 public: 268 static if (hasFailure!TObserver) 269 { 270 this(TObserver observer, TScheduler scheduler, Disposable disposable) 271 { 272 _observer = observer; 273 _scheduler = scheduler; 274 _disposable = disposable; 275 } 276 } 277 else 278 { 279 this(TObserver observer, TScheduler scheduler) 280 { 281 _observer = observer; 282 _scheduler = scheduler; 283 } 284 } 285 public: 286 void put(E obj) 287 { 288 _scheduler.start({ 289 static if (hasFailure!TObserver) 290 { 291 try 292 { 293 _observer.put(obj); 294 } 295 catch (Exception e) 296 { 297 _observer.failure(e); 298 _disposable.dispose(); 299 } 300 } 301 else 302 { 303 _observer.put(obj); 304 } 305 }); 306 } 307 308 static if (hasCompleted!TObserver) 309 { 310 void completed() 311 { 312 _scheduler.start({ _observer.completed(); }); 313 } 314 } 315 static if (hasFailure!TObserver) 316 { 317 void failure(Exception e) 318 { 319 _scheduler.start({ _observer.failure(e); }); 320 } 321 } 322 private: 323 TObserver _observer; 324 TScheduler _scheduler; 325 static if (hasFailure!TObserver) 326 { 327 Disposable _disposable; 328 } 329 } 330 331 struct ObserveOnObservable(TObservable, TScheduler : Scheduler) 332 { 333 alias ElementType = TObservable.ElementType; 334 public: 335 this(TObservable observable, TScheduler scheduler) 336 { 337 _observable = observable; 338 _scheduler = scheduler; 339 } 340 341 public: 342 auto subscribe(TObserver)(TObserver observer) 343 { 344 alias ObserverType = ObserveOnObserver!(TObserver, TScheduler, TObservable.ElementType); 345 static if (hasFailure!TObserver) 346 { 347 auto disposable = new SingleAssignmentDisposable; 348 disposable.setDisposable(disposableObject(doSubscribe(_observable, 349 ObserverType(observer, _scheduler, disposable)))); 350 return disposable; 351 } 352 else 353 { 354 return doSubscribe(_observable, ObserverType(observer, _scheduler)); 355 } 356 } 357 358 private: 359 TObservable _observable; 360 TScheduler _scheduler; 361 } 362 363 unittest 364 { 365 alias TestObservable = ObserveOnObservable!(Observable!int, Scheduler); 366 static assert(isObservable!(TestObservable, int)); 367 368 import rx.subject : SubjectObject; 369 370 auto sub = new SubjectObject!int; 371 auto scheduler = new LocalScheduler; 372 373 auto scheduled = TestObservable(sub, scheduler); 374 375 auto flag1 = false; 376 auto d = scheduled.subscribe((int n) { flag1 = true; }); 377 scope (exit) 378 d.dispose(); 379 .put(sub, 1); 380 assert(flag1); 381 382 auto flag2 = false; 383 auto d2 = scheduled.doSubscribe((int n) { flag2 = true; }); 384 scope (exit) 385 d2.dispose(); 386 .put(sub, 2); 387 assert(flag2); 388 } 389 390 ObserveOnObservable!(TObservable, TScheduler) observeOn(TObservable, TScheduler : Scheduler)( 391 auto ref TObservable observable, TScheduler scheduler) 392 { 393 return typeof(return)(observable, scheduler); 394 } 395 396 unittest 397 { 398 import std.concurrency; 399 import rx.subject; 400 401 auto subject = new SubjectObject!int; 402 auto scheduler = new LocalScheduler; 403 auto scheduled = subject.observeOn(scheduler); 404 405 import std.array : appender; 406 407 auto buf = appender!(int[]); 408 auto observer = observerObject!int(buf); 409 410 auto d1 = scheduled.subscribe(buf); 411 auto d2 = scheduled.subscribe(observer); 412 413 subject.put(0); 414 assert(buf.data.length == 2); 415 416 subject.put(1); 417 assert(buf.data.length == 4); 418 } 419 420 unittest 421 { 422 import std.concurrency; 423 import rx.subject; 424 425 auto subject = new SubjectObject!int; 426 auto scheduler = new LocalScheduler; 427 auto scheduled = subject.observeOn(scheduler); 428 429 struct ObserverA 430 { 431 void put(int n) 432 { 433 } 434 } 435 436 struct ObserverB 437 { 438 void put(int n) 439 { 440 } 441 442 void completed() 443 { 444 } 445 } 446 447 struct ObserverC 448 { 449 void put(int n) 450 { 451 } 452 453 void failure(Exception e) 454 { 455 } 456 } 457 458 struct ObserverD 459 { 460 void put(int n) 461 { 462 } 463 464 void completed() 465 { 466 } 467 468 void failure(Exception e) 469 { 470 } 471 } 472 473 scheduled.doSubscribe(ObserverA()); 474 scheduled.doSubscribe(ObserverB()); 475 scheduled.doSubscribe(ObserverC()); 476 scheduled.doSubscribe(ObserverD()); 477 478 subject.put(1); 479 subject.completed(); 480 } 481 482 /// 483 class SubscribeOnObservable(TObservable, TScheduler : Scheduler) 484 { 485 alias ElementType = TObservable.ElementType; 486 487 public: 488 this(TObservable observable, TScheduler scheduler) 489 { 490 _observable = observable; 491 _scheduler = scheduler; 492 } 493 494 public: 495 auto subscribe(TObserver)(TObserver observer) 496 { 497 auto disposable = new SingleAssignmentDisposable; 498 _scheduler.start({ 499 auto temp = doSubscribe(_observable, observer); 500 disposable.setDisposable(disposableObject(temp)); 501 }); 502 return disposable; 503 } 504 505 private: 506 TObservable _observable; 507 TScheduler _scheduler; 508 } 509 510 unittest 511 { 512 alias TestObservable = SubscribeOnObservable!(Observable!int, Scheduler); 513 static assert(isObservable!(TestObservable, int)); 514 515 import rx.subject : SubjectObject; 516 517 auto sub = new SubjectObject!int; 518 auto scheduler = new LocalScheduler; 519 520 auto scheduled = new TestObservable(sub, scheduler); 521 522 auto flag1 = false; 523 auto d = scheduled.subscribe((int n) { flag1 = true; }); 524 scope (exit) 525 d.dispose(); 526 .put(sub, 1); 527 assert(flag1); 528 529 auto flag2 = false; 530 auto d2 = scheduled.doSubscribe((int n) { flag2 = true; }); 531 scope (exit) 532 d2.dispose(); 533 .put(sub, 2); 534 assert(flag2); 535 } 536 537 /// 538 SubscribeOnObservable!(TObservable, TScheduler) subscribeOn(TObservable, TScheduler : Scheduler)( 539 auto ref TObservable observable, auto ref TScheduler scheduler) 540 { 541 return new typeof(return)(observable, scheduler); 542 } 543 /// 544 unittest 545 { 546 import rx.observable : defer; 547 548 auto sub = defer!int((Observer!int observer) { 549 .put(observer, 100); 550 return NopDisposable.instance; 551 }); 552 auto scheduler = new LocalScheduler; 553 554 auto scheduled = sub.subscribeOn(scheduler); 555 556 int value = 0; 557 auto d = scheduled.doSubscribe((int n) { value = n; }); 558 scope (exit) 559 d.dispose(); 560 561 assert(value == 100); 562 } 563 /// 564 unittest 565 { 566 import rx.observable : defer; 567 import rx.util : EventSignal; 568 569 auto sub = defer!int((Observer!int observer) { 570 .put(observer, 100); 571 return NopDisposable.instance; 572 }); 573 auto scheduler = new TaskPoolScheduler; 574 auto scheduled = sub.subscribeOn(scheduler); 575 576 int value = 0; 577 auto signal = new EventSignal; 578 auto d = scheduled.subscribe((int n) { value = n; signal.setSignal(); }); 579 scope (exit) 580 d.dispose(); 581 582 signal.wait(); 583 assert(value == 100); 584 } 585 586 unittest 587 { 588 import std.algorithm : equal; 589 import std.array : Appender; 590 import rx.util : EventSignal; 591 592 auto buf = Appender!(int[])(); 593 auto data = [1, 2, 3, 4]; 594 595 auto event = new EventSignal; 596 auto observer = (int n) { 597 buf.put(n); 598 if (n == 4) 599 event.setSignal(); 600 }; 601 data.asObservable().subscribeOn(new ThreadScheduler).subscribe(observer); 602 603 event.wait(); 604 605 assert(equal(buf.data, data)); 606 } 607 608 unittest 609 { 610 import std.algorithm : equal; 611 import std.array : Appender; 612 import rx.util : EventSignal; 613 614 auto buf = Appender!(int[])(); 615 auto data = [1, 2, 3, 4]; 616 617 auto event = new EventSignal; 618 auto observer = (int n) { 619 buf.put(n); 620 if (n == 4) 621 event.setSignal(); 622 }; 623 data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe(observer); 624 625 event.wait(); 626 627 assert(equal(buf.data, data)); 628 } 629 630 unittest 631 { 632 import rx.util : EventSignal; 633 634 auto data = [1, 2, 3, 4]; 635 auto event = new EventSignal(); 636 637 data.asObservable().subscribeOn(new ThreadScheduler).subscribe((int a) { 638 if (a == 4) 639 event.setSignal(); 640 }); 641 642 event.wait(); 643 } 644 645 unittest 646 { 647 import rx.util : EventSignal; 648 649 auto data = [1, 2, 3, 4]; 650 auto event = new EventSignal(); 651 652 data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe((int a) { 653 if (a == 4) 654 event.setSignal(); 655 }); 656 657 event.wait(); 658 } 659 660 unittest 661 { 662 import core.atomic; 663 import core.sync.condition; 664 import std.typetuple; 665 import rx.util : EventSignal; 666 667 enum N = 4; 668 669 void test(Scheduler scheduler) 670 { 671 auto signal = new EventSignal; 672 shared count = 0; 673 foreach (n; 0 .. N) 674 { 675 scheduler.start(() { 676 atomicOp!"+="(count, 1); 677 Thread.sleep(dur!"msecs"(50)); 678 if (atomicLoad(count) == N) 679 signal.setSignal(); 680 }); 681 } 682 signal.wait(); 683 assert(count == N); 684 } 685 686 test(new LocalScheduler); 687 test(new ThreadScheduler); 688 test(new TaskPoolScheduler); 689 test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler)); 690 test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler)); 691 } 692 693 private __gshared Scheduler s_scheduler; 694 shared static this() 695 { 696 s_scheduler = new TaskPoolScheduler; 697 } 698 699 Scheduler currentScheduler() @property 700 { 701 return s_scheduler; 702 } 703 704 TScheduler currentScheduler(TScheduler : Scheduler)(TScheduler scheduler) @property 705 { 706 s_scheduler = scheduler; 707 return scheduler; 708 } 709 710 unittest 711 { 712 Scheduler s = currentScheduler; 713 scope (exit) 714 currentScheduler = s; 715 716 TaskPoolScheduler s1 = new TaskPoolScheduler; 717 TaskPoolScheduler s2 = currentScheduler = s1; 718 assert(s2 is s1); 719 }