1 /++ 2 + This module defines the concept of Scheduler. 3 +/ 4 module rx.scheduler; 5 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 10 import core.time; 11 import core.thread : Thread; 12 import std.range : put; 13 import std.parallelism : TaskPool, taskPool, task; 14 15 //Note: 16 // In single core, taskPool's worker are not initialized. 17 // Some asynchronous algorithm does not work as expected, so at least 1 thread is reserved. 18 version (Disable_ReservePoolThreadsOnSingleCore) 19 { 20 } 21 else 22 { 23 shared static this() 24 { 25 import std.parallelism : defaultPoolThreads; 26 27 if (defaultPoolThreads == 0) 28 defaultPoolThreads = 1; 29 } 30 } 31 32 /// 33 enum isScheduler(T) = is(typeof({ 34 T scheduler = void; 35 36 void delegate() work = null; 37 scheduler.start(work); 38 })); 39 40 /// 41 unittest 42 { 43 static assert(isScheduler!Scheduler); 44 static assert(isScheduler!LocalScheduler); 45 } 46 47 /// 48 enum isAsyncScheduler(T) = isScheduler!T && is(typeof({ 49 T scheduler = void; 50 51 void delegate() work = null; 52 Duration time = void; 53 CancellationToken token = scheduler.schedule(work, time); 54 })); 55 56 /// 57 unittest 58 { 59 static assert(!isAsyncScheduler!Scheduler); 60 static assert(!isAsyncScheduler!LocalScheduler); 61 62 static assert(isAsyncScheduler!AsyncScheduler); 63 static assert(isAsyncScheduler!ThreadScheduler); 64 static assert(isAsyncScheduler!TaskPoolScheduler); 65 static assert(isAsyncScheduler!(HistoricalScheduler!ThreadScheduler)); 66 static assert(isAsyncScheduler!(HistoricalScheduler!TaskPoolScheduler)); 67 } 68 69 /// 70 interface Scheduler 71 { 72 /// 73 void start(void delegate() op); 74 } 75 76 /// 77 interface AsyncScheduler : Scheduler 78 { 79 /// 80 CancellationToken schedule(void delegate() op, Duration val); 81 } 82 83 /// 84 class LocalScheduler : Scheduler 85 { 86 public: 87 /// 88 void start(void delegate() op) 89 { 90 op(); 91 } 92 } 93 94 /// 95 class ThreadScheduler : AsyncScheduler 96 { 97 /// 98 void start(void delegate() op) 99 { 100 auto t = new Thread(op); 101 t.start(); 102 } 103 104 /// 105 CancellationToken schedule(void delegate() op, Duration val) 106 { 107 auto target = MonoTime.currTime + val; 108 auto c = new CancellationToken; 109 start({ 110 if (c.isCanceled) 111 return; 112 auto dt = target - MonoTime.currTime; 113 if (dt > Duration.zero) 114 Thread.sleep(dt); 115 if (!c.isCanceled) 116 op(); 117 }); 118 return c; 119 } 120 } 121 122 unittest 123 { 124 import std.stdio : writeln; 125 126 writeln("Testing ThreadScheduler..."); 127 scope (exit) 128 writeln("ThreadScheduler test is completed."); 129 130 import rx.util : EventSignal; 131 132 auto scheduler = new ThreadScheduler; 133 auto signal = new EventSignal; 134 auto done = false; 135 auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs); 136 137 signal.wait(); 138 assert(done); 139 assert(!c.isCanceled); 140 } 141 142 /// 143 class TaskPoolScheduler : AsyncScheduler 144 { 145 public: 146 /// 147 this(TaskPool pool = null) 148 { 149 if (pool is null) 150 pool = taskPool; 151 152 _pool = pool; 153 } 154 155 public: 156 /// 157 void start(void delegate() op) 158 { 159 _pool.put(task(op)); 160 } 161 162 /// 163 CancellationToken schedule(void delegate() op, Duration val) 164 { 165 auto target = MonoTime.currTime + val; 166 auto c = new CancellationToken; 167 start({ 168 if (c.isCanceled) 169 return; 170 auto dt = target - MonoTime.currTime; 171 if (dt > Duration.zero) 172 Thread.sleep(dt); 173 if (!c.isCanceled) 174 op(); 175 }); 176 return c; 177 } 178 179 private: 180 TaskPool _pool; 181 } 182 183 unittest 184 { 185 import std.stdio : writeln; 186 import std.parallelism : totalCPUs, defaultPoolThreads; 187 188 writeln("Testing TaskPoolScheduler..."); 189 scope (exit) 190 writeln("TaskPoolScheduler test is completed."); 191 192 version (OSX) 193 { 194 writeln("totalCPUs: ", totalCPUs); 195 writeln("defaultPoolThreads: ", defaultPoolThreads); 196 } 197 198 import rx.util : EventSignal; 199 200 auto scheduler = new TaskPoolScheduler; 201 auto signal = new EventSignal; 202 auto done = false; 203 auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs); 204 205 signal.wait(); 206 assert(done); 207 assert(!c.isCanceled); 208 } 209 210 /// 211 class HistoricalScheduler(T) : AsyncScheduler 212 { 213 static assert(is(T : AsyncScheduler)); 214 215 public: 216 /// 217 this(T innerScheduler) 218 { 219 _offset = Duration.zero; 220 _innerScheduler = innerScheduler; 221 } 222 223 public: 224 /// 225 void start(void delegate() op) 226 { 227 _innerScheduler.start(op); 228 } 229 230 /// 231 CancellationToken schedule(void delegate() op, Duration val) 232 { 233 return _innerScheduler.schedule(op, val - _offset); 234 } 235 236 void roll(Duration val) 237 { 238 _offset += val; 239 } 240 241 private: 242 T _innerScheduler; 243 Duration _offset; 244 } 245 /// 246 HistoricalScheduler!TScheduler historicalScheduler(TScheduler)(auto ref TScheduler scheduler) 247 { 248 return new typeof(return)(scheduler); 249 } 250 251 unittest 252 { 253 import std.stdio : writeln; 254 255 writeln("Testing HistoricalScheduler..."); 256 scope (exit) 257 writeln("HistoricalScheduler test is completed."); 258 259 void test(AsyncScheduler scheduler) 260 { 261 import rx.util : EventSignal; 262 263 bool done = false; 264 auto signal = new EventSignal; 265 266 auto c = scheduler.schedule(() { done = true; signal.setSignal(); }, dur!"msecs"(100)); 267 assert(!done); 268 269 signal.wait(); 270 assert(done); 271 assert(!c.isCanceled); 272 } 273 274 //test(new ThreadScheduler); 275 //test(new TaskPoolScheduler); 276 test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler)); 277 test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler)); 278 } 279 280 unittest 281 { 282 void test(AsyncScheduler scheduler) 283 { 284 bool done = false; 285 auto c = scheduler.schedule(() { done = true; }, dur!"msecs"(50)); 286 c.cancel(); 287 Thread.sleep(dur!"msecs"(100)); 288 assert(!done); 289 } 290 291 test(new ThreadScheduler); 292 test(new TaskPoolScheduler); 293 test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler)); 294 test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler)); 295 } 296 297 unittest 298 { 299 import std.typetuple : TypeTuple; 300 301 foreach (T; TypeTuple!(ThreadScheduler, TaskPoolScheduler)) 302 { 303 auto scheduler = historicalScheduler(new T); 304 305 scheduler.roll(dur!"seconds"(20)); 306 307 auto done = false; 308 auto c = scheduler.schedule(() { done = true; }, dur!"seconds"(10)); 309 Thread.sleep(dur!"msecs"(10)); // wait for a context switch 310 assert(done); 311 } 312 } 313 314 unittest 315 { 316 static assert(!__traits(compiles, { HistoricalScheduler!LocalScheduler s; })); 317 } 318 319 /// 320 template MostDerivedScheduler(T) 321 { 322 static assert(isScheduler!T); 323 324 static if (isAsyncScheduler!T) 325 { 326 /// 327 alias MostDerivedScheduler = AsyncScheduler; 328 } 329 else 330 { 331 /// 332 alias MostDerivedScheduler = Scheduler; 333 } 334 } 335 336 /// 337 unittest 338 { 339 alias S1 = MostDerivedScheduler!Scheduler; 340 alias S2 = MostDerivedScheduler!AsyncScheduler; 341 alias S3 = MostDerivedScheduler!LocalScheduler; 342 alias S4 = MostDerivedScheduler!ThreadScheduler; 343 alias S5 = MostDerivedScheduler!TaskPoolScheduler; 344 alias S6 = MostDerivedScheduler!(HistoricalScheduler!ThreadScheduler); 345 alias S7 = MostDerivedScheduler!(HistoricalScheduler!TaskPoolScheduler); 346 347 static assert(is(S1 == Scheduler)); 348 static assert(is(S2 == AsyncScheduler)); 349 static assert(is(S3 == Scheduler)); 350 static assert(is(S4 == AsyncScheduler)); 351 static assert(is(S5 == AsyncScheduler)); 352 static assert(is(S6 == AsyncScheduler)); 353 static assert(is(S7 == AsyncScheduler)); 354 } 355 356 /// 357 final class SchedulerObject(TScheduler) : MostDerivedScheduler!TScheduler 358 { 359 private: 360 TScheduler scheduler; 361 362 public: 363 /// 364 this(TScheduler scheduler) 365 { 366 this.scheduler = scheduler; 367 } 368 369 /// 370 this(ref TScheduler scheduler) 371 { 372 this.scheduler = scheduler; 373 } 374 375 /// 376 void start(void delegate() op) 377 { 378 scheduler.start(op); 379 } 380 381 static if (isAsyncScheduler!TScheduler) 382 { 383 /// 384 CancellationToken schedule(void delegate() op, Duration val) 385 { 386 return scheduler.schedule(op, val); 387 } 388 } 389 } 390 391 /// 392 MostDerivedScheduler!TScheduler schedulerObject(TScheduler)(auto ref TScheduler scheduler) 393 { 394 static if (is(MostDerivedScheduler!TScheduler == AsyncScheduler)) 395 { 396 static if (is(TScheduler : AsyncScheduler)) 397 return scheduler; 398 else 399 return new SchedulerObject!TScheduler(scheduler); 400 } 401 else static if (is(MostDerivedScheduler!TScheduler == Scheduler)) 402 { 403 static if (is(TScheduler : Scheduler)) 404 return scheduler; 405 else 406 return new SchedulerObject!TScheduler(scheduler); 407 } 408 else 409 static assert(false); 410 } 411 412 /// 413 unittest 414 { 415 struct MyScheduler 416 { 417 void start(void delegate() op) 418 { 419 } 420 } 421 422 class MyClassScheduler 423 { 424 void start(void delegate() op) 425 { 426 } 427 } 428 429 class MyClassDerivedScheduler : Scheduler 430 { 431 void start(void delegate() op) 432 { 433 } 434 } 435 436 struct MyAsyncScheduler 437 { 438 void start(void delegate() op) 439 { 440 } 441 442 CancellationToken schedule(void delegate() op, Duration val) 443 { 444 return null; 445 } 446 } 447 448 class MyClassAsyncScheduler 449 { 450 void start(void delegate() op) 451 { 452 } 453 454 CancellationToken schedule(void delegate() op, Duration val) 455 { 456 return null; 457 } 458 } 459 460 class MyClassPartAsyncScheduler : Scheduler 461 { 462 void start(void delegate() op) 463 { 464 } 465 466 CancellationToken schedule(void delegate() op, Duration val) 467 { 468 return null; 469 } 470 } 471 472 class MyClassDerivedAsyncScheduler : AsyncScheduler 473 { 474 void start(void delegate() op) 475 { 476 } 477 478 CancellationToken schedule(void delegate() op, Duration val) 479 { 480 return null; 481 } 482 } 483 484 auto s1 = MyScheduler(); 485 auto s2 = new MyClassScheduler; 486 auto s3 = new MyClassDerivedScheduler; 487 auto s4 = MyAsyncScheduler(); 488 auto s5 = new MyClassAsyncScheduler; 489 auto s6 = new MyClassPartAsyncScheduler; 490 auto s7 = new MyClassDerivedAsyncScheduler; 491 492 Scheduler t1 = s1.schedulerObject(); 493 Scheduler t2 = s2.schedulerObject(); 494 Scheduler t3 = s3.schedulerObject(); 495 AsyncScheduler t4 = s4.schedulerObject(); 496 AsyncScheduler t5 = s5.schedulerObject(); 497 AsyncScheduler t6 = s6.schedulerObject(); 498 AsyncScheduler t7 = s7.schedulerObject(); 499 500 assert(t1 !is null); 501 assert(t2 !is null); 502 assert(t3 !is null); 503 assert(t4 !is null); 504 assert(t5 !is null); 505 assert(t6 !is null); 506 assert(t7 !is null); 507 508 assert(t3 is s3); 509 assert(t7 is s7); 510 } 511 512 /// 513 unittest 514 { 515 struct MyScheduler 516 { 517 void start(void delegate() op) 518 { 519 op(); 520 } 521 } 522 523 MyScheduler scheduler; 524 Scheduler wrapped = scheduler.schedulerObject(); 525 assert(wrapped !is null); 526 } 527 528 /// 529 struct ObserveOnObserver(TObserver, TScheduler, E) 530 { 531 public: 532 static if (hasFailure!TObserver) 533 { 534 /// 535 this(TObserver observer, TScheduler scheduler, Disposable disposable) 536 { 537 _observer = observer; 538 _scheduler = scheduler; 539 _disposable = disposable; 540 } 541 } 542 else 543 { 544 /// 545 this(TObserver observer, TScheduler scheduler) 546 { 547 _observer = observer; 548 _scheduler = scheduler; 549 } 550 } 551 public: 552 /// 553 void put(E obj) 554 { 555 _scheduler.start({ 556 static if (hasFailure!TObserver) 557 { 558 try 559 { 560 _observer.put(obj); 561 } 562 catch (Exception e) 563 { 564 _observer.failure(e); 565 _disposable.dispose(); 566 } 567 } 568 else 569 { 570 _observer.put(obj); 571 } 572 }); 573 } 574 575 static if (hasCompleted!TObserver) 576 { 577 /// 578 void completed() 579 { 580 _scheduler.start({ _observer.completed(); }); 581 } 582 } 583 static if (hasFailure!TObserver) 584 { 585 /// 586 void failure(Exception e) 587 { 588 _scheduler.start({ _observer.failure(e); }); 589 } 590 } 591 private: 592 TObserver _observer; 593 TScheduler _scheduler; 594 static if (hasFailure!TObserver) 595 { 596 Disposable _disposable; 597 } 598 } 599 600 /// 601 struct ObserveOnObservable(TObservable, TScheduler : Scheduler) 602 { 603 alias ElementType = TObservable.ElementType; 604 public: 605 /// 606 this(TObservable observable, TScheduler scheduler) 607 { 608 _observable = observable; 609 _scheduler = scheduler; 610 } 611 612 public: 613 /// 614 auto subscribe(TObserver)(TObserver observer) 615 { 616 alias ObserverType = ObserveOnObserver!(TObserver, TScheduler, TObservable.ElementType); 617 static if (hasFailure!TObserver) 618 { 619 auto disposable = new SingleAssignmentDisposable; 620 disposable.setDisposable(disposableObject(doSubscribe(_observable, 621 ObserverType(observer, _scheduler, disposable)))); 622 return disposable; 623 } 624 else 625 { 626 return doSubscribe(_observable, ObserverType(observer, _scheduler)); 627 } 628 } 629 630 private: 631 TObservable _observable; 632 TScheduler _scheduler; 633 } 634 635 unittest 636 { 637 alias TestObservable = ObserveOnObservable!(Observable!int, Scheduler); 638 static assert(isObservable!(TestObservable, int)); 639 640 import rx.subject : SubjectObject; 641 642 auto sub = new SubjectObject!int; 643 auto scheduler = new LocalScheduler; 644 645 auto scheduled = TestObservable(sub, scheduler); 646 647 auto flag1 = false; 648 auto d = scheduled.subscribe((int n) { flag1 = true; }); 649 scope (exit) 650 d.dispose(); 651 .put(sub, 1); 652 assert(flag1); 653 654 auto flag2 = false; 655 auto d2 = scheduled.doSubscribe((int n) { flag2 = true; }); 656 scope (exit) 657 d2.dispose(); 658 .put(sub, 2); 659 assert(flag2); 660 } 661 662 /// 663 ObserveOnObservable!(TObservable, TScheduler) observeOn(TObservable, TScheduler : Scheduler)( 664 auto ref TObservable observable, TScheduler scheduler) 665 { 666 return typeof(return)(observable, scheduler); 667 } 668 669 /// 670 unittest 671 { 672 import std.concurrency; 673 import rx.subject; 674 675 auto subject = new SubjectObject!int; 676 auto scheduler = new LocalScheduler; 677 auto scheduled = subject.observeOn(scheduler); 678 679 import std.array : appender; 680 681 auto buf = appender!(int[]); 682 auto observer = observerObject!int(buf); 683 684 auto d1 = scheduled.subscribe(buf); 685 auto d2 = scheduled.subscribe(observer); 686 687 subject.put(0); 688 assert(buf.data.length == 2); 689 690 subject.put(1); 691 assert(buf.data.length == 4); 692 } 693 694 unittest 695 { 696 import std.concurrency; 697 import rx.subject; 698 699 auto subject = new SubjectObject!int; 700 auto scheduler = new LocalScheduler; 701 auto scheduled = subject.observeOn(scheduler); 702 703 struct ObserverA 704 { 705 void put(int n) 706 { 707 } 708 } 709 710 struct ObserverB 711 { 712 void put(int n) 713 { 714 } 715 716 void completed() 717 { 718 } 719 } 720 721 struct ObserverC 722 { 723 void put(int n) 724 { 725 } 726 727 void failure(Exception e) 728 { 729 } 730 } 731 732 struct ObserverD 733 { 734 void put(int n) 735 { 736 } 737 738 void completed() 739 { 740 } 741 742 void failure(Exception e) 743 { 744 } 745 } 746 747 scheduled.doSubscribe(ObserverA()); 748 scheduled.doSubscribe(ObserverB()); 749 scheduled.doSubscribe(ObserverC()); 750 scheduled.doSubscribe(ObserverD()); 751 752 subject.put(1); 753 subject.completed(); 754 } 755 756 /// 757 class SubscribeOnObservable(TObservable, TScheduler : Scheduler) 758 { 759 alias ElementType = TObservable.ElementType; 760 761 public: 762 /// 763 this(TObservable observable, TScheduler scheduler) 764 { 765 _observable = observable; 766 _scheduler = scheduler; 767 } 768 769 public: 770 /// 771 auto subscribe(TObserver)(TObserver observer) 772 { 773 auto disposable = new SingleAssignmentDisposable; 774 _scheduler.start({ 775 auto temp = doSubscribe(_observable, observer); 776 disposable.setDisposable(disposableObject(temp)); 777 }); 778 return disposable; 779 } 780 781 private: 782 TObservable _observable; 783 TScheduler _scheduler; 784 } 785 786 unittest 787 { 788 alias TestObservable = SubscribeOnObservable!(Observable!int, Scheduler); 789 static assert(isObservable!(TestObservable, int)); 790 791 import rx.subject : SubjectObject; 792 793 auto sub = new SubjectObject!int; 794 auto scheduler = new LocalScheduler; 795 796 auto scheduled = new TestObservable(sub, scheduler); 797 798 auto flag1 = false; 799 auto d = scheduled.subscribe((int n) { flag1 = true; }); 800 scope (exit) 801 d.dispose(); 802 .put(sub, 1); 803 assert(flag1); 804 805 auto flag2 = false; 806 auto d2 = scheduled.doSubscribe((int n) { flag2 = true; }); 807 scope (exit) 808 d2.dispose(); 809 .put(sub, 2); 810 assert(flag2); 811 } 812 813 /// 814 SubscribeOnObservable!(TObservable, TScheduler) subscribeOn(TObservable, TScheduler : Scheduler)( 815 auto ref TObservable observable, auto ref TScheduler scheduler) 816 { 817 return new typeof(return)(observable, scheduler); 818 } 819 /// 820 unittest 821 { 822 import rx.observable : defer; 823 824 auto sub = defer!int((Observer!int observer) { 825 .put(observer, 100); 826 return NopDisposable.instance; 827 }); 828 auto scheduler = new LocalScheduler; 829 830 auto scheduled = sub.subscribeOn(scheduler); 831 832 int value = 0; 833 auto d = scheduled.doSubscribe((int n) { value = n; }); 834 scope (exit) 835 d.dispose(); 836 837 assert(value == 100); 838 } 839 /// 840 unittest 841 { 842 import rx.observable : defer; 843 import rx.util : EventSignal; 844 845 auto sub = defer!int((Observer!int observer) { 846 .put(observer, 100); 847 return NopDisposable.instance; 848 }); 849 auto scheduler = new TaskPoolScheduler; 850 auto scheduled = sub.subscribeOn(scheduler); 851 852 int value = 0; 853 auto signal = new EventSignal; 854 auto d = scheduled.subscribe((int n) { value = n; signal.setSignal(); }); 855 scope (exit) 856 d.dispose(); 857 858 signal.wait(); 859 assert(value == 100); 860 } 861 862 unittest 863 { 864 import std.algorithm : equal; 865 import std.array : Appender; 866 import rx.util : EventSignal; 867 868 auto buf = Appender!(int[])(); 869 auto data = [1, 2, 3, 4]; 870 871 auto event = new EventSignal; 872 auto observer = (int n) { 873 buf.put(n); 874 if (n == 4) 875 event.setSignal(); 876 }; 877 data.asObservable().subscribeOn(new ThreadScheduler).subscribe(observer); 878 879 event.wait(); 880 881 assert(equal(buf.data, data)); 882 } 883 884 unittest 885 { 886 import std.algorithm : equal; 887 import std.array : Appender; 888 import rx.util : EventSignal; 889 890 auto buf = Appender!(int[])(); 891 auto data = [1, 2, 3, 4]; 892 893 auto event = new EventSignal; 894 auto observer = (int n) { 895 buf.put(n); 896 if (n == 4) 897 event.setSignal(); 898 }; 899 data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe(observer); 900 901 event.wait(); 902 903 assert(equal(buf.data, data)); 904 } 905 906 unittest 907 { 908 import rx.util : EventSignal; 909 910 auto data = [1, 2, 3, 4]; 911 auto event = new EventSignal(); 912 913 data.asObservable().subscribeOn(new ThreadScheduler).subscribe((int a) { 914 if (a == 4) 915 event.setSignal(); 916 }); 917 918 event.wait(); 919 } 920 921 unittest 922 { 923 import rx.util : EventSignal; 924 925 auto data = [1, 2, 3, 4]; 926 auto event = new EventSignal(); 927 928 data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe((int a) { 929 if (a == 4) 930 event.setSignal(); 931 }); 932 933 event.wait(); 934 } 935 936 unittest 937 { 938 import core.atomic; 939 import core.sync.condition; 940 import std.typetuple; 941 import rx.util : EventSignal; 942 943 enum N = 4; 944 945 void test(Scheduler scheduler) 946 { 947 auto signal = new EventSignal; 948 shared count = 0; 949 foreach (n; 0 .. N) 950 { 951 scheduler.start(() { 952 atomicOp!"+="(count, 1); 953 Thread.sleep(dur!"msecs"(50)); 954 if (atomicLoad(count) == N) 955 signal.setSignal(); 956 }); 957 } 958 signal.wait(); 959 assert(count == N); 960 } 961 962 test(new LocalScheduler); 963 test(new ThreadScheduler); 964 test(new TaskPoolScheduler); 965 test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler)); 966 test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler)); 967 } 968 969 private __gshared Scheduler s_scheduler; 970 shared static this() 971 { 972 s_scheduler = new TaskPoolScheduler; 973 } 974 975 /// 976 Scheduler currentScheduler() @property 977 { 978 return s_scheduler; 979 } 980 981 /// 982 TScheduler currentScheduler(TScheduler : Scheduler)(TScheduler scheduler) @property 983 { 984 s_scheduler = scheduler; 985 return scheduler; 986 } 987 988 unittest 989 { 990 Scheduler s = currentScheduler; 991 scope (exit) 992 currentScheduler = s; 993 994 TaskPoolScheduler s1 = new TaskPoolScheduler; 995 TaskPoolScheduler s2 = currentScheduler = s1; 996 assert(s2 is s1); 997 }