1 /++
2  + This module defines the concept of Scheduler.
3  +/
4 module rx.scheduler;
5 
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 
10 import core.time;
11 import core.thread : Thread;
12 import std.range : put;
13 import std.parallelism : TaskPool, taskPool, task;
14 
15 //Note:
16 // In single core, taskPool's worker are not initialized.
17 // Some asynchronous algorithm does not work as expected, so at least 1 thread is reserved.
18 version (Disable_ReservePoolThreadsOnSingleCore)
19 {
20 }
21 else
22 {
23     shared static this()
24     {
25         import std.parallelism : defaultPoolThreads;
26 
27         if (defaultPoolThreads == 0)
28             defaultPoolThreads = 1;
29     }
30 }
31 
32 ///
33 enum isScheduler(T) = is(typeof({
34             T scheduler = void;
35 
36             void delegate() work = null;
37             scheduler.start(work);
38         }));
39 
40 ///
41 unittest
42 {
43     static assert(isScheduler!Scheduler);
44     static assert(isScheduler!LocalScheduler);
45 }
46 
47 ///
48 enum isAsyncScheduler(T) = isScheduler!T && is(typeof({
49             T scheduler = void;
50 
51             void delegate() work = null;
52             Duration time = void;
53             CancellationToken token = scheduler.schedule(work, time);
54         }));
55 
56 ///
57 unittest
58 {
59     static assert(!isAsyncScheduler!Scheduler);
60     static assert(!isAsyncScheduler!LocalScheduler);
61 
62     static assert(isAsyncScheduler!AsyncScheduler);
63     static assert(isAsyncScheduler!ThreadScheduler);
64     static assert(isAsyncScheduler!TaskPoolScheduler);
65     static assert(isAsyncScheduler!(HistoricalScheduler!ThreadScheduler));
66     static assert(isAsyncScheduler!(HistoricalScheduler!TaskPoolScheduler));
67 }
68 
69 ///
70 interface Scheduler
71 {
72     ///
73     void start(void delegate() op);
74 }
75 
76 ///
77 interface AsyncScheduler : Scheduler
78 {
79     ///
80     CancellationToken schedule(void delegate() op, Duration val);
81 }
82 
83 ///
84 class LocalScheduler : Scheduler
85 {
86 public:
87     ///
88     void start(void delegate() op)
89     {
90         op();
91     }
92 }
93 
94 ///
95 class ThreadScheduler : AsyncScheduler
96 {
97     ///
98     void start(void delegate() op)
99     {
100         auto t = new Thread(op);
101         t.start();
102     }
103 
104     ///
105     CancellationToken schedule(void delegate() op, Duration val)
106     {
107         auto target = MonoTime.currTime + val;
108         auto c = new CancellationToken;
109         start({
110             if (c.isCanceled)
111                 return;
112             auto dt = target - MonoTime.currTime;
113             if (dt > Duration.zero)
114                 Thread.sleep(dt);
115             if (!c.isCanceled)
116                 op();
117         });
118         return c;
119     }
120 }
121 
122 unittest
123 {
124     import std.stdio : writeln;
125 
126     writeln("Testing ThreadScheduler...");
127     scope (exit)
128         writeln("ThreadScheduler test is completed.");
129 
130     import rx.util : EventSignal;
131 
132     auto scheduler = new ThreadScheduler;
133     auto signal = new EventSignal;
134     auto done = false;
135     auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs);
136 
137     signal.wait();
138     assert(done);
139     assert(!c.isCanceled);
140 }
141 
142 ///
143 class TaskPoolScheduler : AsyncScheduler
144 {
145 public:
146     ///
147     this(TaskPool pool = null)
148     {
149         if (pool is null)
150             pool = taskPool;
151 
152         _pool = pool;
153     }
154 
155 public:
156     ///
157     void start(void delegate() op)
158     {
159         _pool.put(task(op));
160     }
161 
162     ///
163     CancellationToken schedule(void delegate() op, Duration val)
164     {
165         auto target = MonoTime.currTime + val;
166         auto c = new CancellationToken;
167         start({
168             if (c.isCanceled)
169                 return;
170             auto dt = target - MonoTime.currTime;
171             if (dt > Duration.zero)
172                 Thread.sleep(dt);
173             if (!c.isCanceled)
174                 op();
175         });
176         return c;
177     }
178 
179 private:
180     TaskPool _pool;
181 }
182 
183 unittest
184 {
185     import std.stdio : writeln;
186     import std.parallelism : totalCPUs, defaultPoolThreads;
187 
188     writeln("Testing TaskPoolScheduler...");
189     scope (exit)
190         writeln("TaskPoolScheduler test is completed.");
191 
192     version (OSX)
193     {
194         writeln("totalCPUs: ", totalCPUs);
195         writeln("defaultPoolThreads: ", defaultPoolThreads);
196     }
197 
198     import rx.util : EventSignal;
199 
200     auto scheduler = new TaskPoolScheduler;
201     auto signal = new EventSignal;
202     auto done = false;
203     auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs);
204 
205     signal.wait();
206     assert(done);
207     assert(!c.isCanceled);
208 }
209 
210 ///
211 class HistoricalScheduler(T) : AsyncScheduler
212 {
213     static assert(is(T : AsyncScheduler));
214 
215 public:
216     ///
217     this(T innerScheduler)
218     {
219         _offset = Duration.zero;
220         _innerScheduler = innerScheduler;
221     }
222 
223 public:
224     ///
225     void start(void delegate() op)
226     {
227         _innerScheduler.start(op);
228     }
229 
230     ///
231     CancellationToken schedule(void delegate() op, Duration val)
232     {
233         return _innerScheduler.schedule(op, val - _offset);
234     }
235 
236     void roll(Duration val)
237     {
238         _offset += val;
239     }
240 
241 private:
242     T _innerScheduler;
243     Duration _offset;
244 }
245 ///
246 HistoricalScheduler!TScheduler historicalScheduler(TScheduler)(auto ref TScheduler scheduler)
247 {
248     return new typeof(return)(scheduler);
249 }
250 
251 unittest
252 {
253     import std.stdio : writeln;
254 
255     writeln("Testing HistoricalScheduler...");
256     scope (exit)
257         writeln("HistoricalScheduler test is completed.");
258 
259     void test(AsyncScheduler scheduler)
260     {
261         import rx.util : EventSignal;
262 
263         bool done = false;
264         auto signal = new EventSignal;
265 
266         auto c = scheduler.schedule(() { done = true; signal.setSignal(); }, dur!"msecs"(100));
267         assert(!done);
268 
269         signal.wait();
270         assert(done);
271         assert(!c.isCanceled);
272     }
273 
274     //test(new ThreadScheduler);
275     //test(new TaskPoolScheduler);
276     test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler));
277     test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler));
278 }
279 
280 unittest
281 {
282     void test(AsyncScheduler scheduler)
283     {
284         bool done = false;
285         auto c = scheduler.schedule(() { done = true; }, dur!"msecs"(50));
286         c.cancel();
287         Thread.sleep(dur!"msecs"(100));
288         assert(!done);
289     }
290 
291     test(new ThreadScheduler);
292     test(new TaskPoolScheduler);
293     test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler));
294     test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler));
295 }
296 
297 unittest
298 {
299     import std.typetuple : TypeTuple;
300 
301     foreach (T; TypeTuple!(ThreadScheduler, TaskPoolScheduler))
302     {
303         auto scheduler = historicalScheduler(new T);
304 
305         scheduler.roll(dur!"seconds"(20));
306 
307         auto done = false;
308         auto c = scheduler.schedule(() { done = true; }, dur!"seconds"(10));
309         Thread.sleep(dur!"msecs"(10)); // wait for a context switch
310         assert(done);
311     }
312 }
313 
314 unittest
315 {
316     static assert(!__traits(compiles, { HistoricalScheduler!LocalScheduler s; }));
317 }
318 
319 ///
320 template MostDerivedScheduler(T)
321 {
322     static assert(isScheduler!T);
323 
324     static if (isAsyncScheduler!T)
325     {
326         ///
327         alias MostDerivedScheduler = AsyncScheduler;
328     }
329     else
330     {
331         ///
332         alias MostDerivedScheduler = Scheduler;
333     }
334 }
335 
336 ///
337 unittest
338 {
339     alias S1 = MostDerivedScheduler!Scheduler;
340     alias S2 = MostDerivedScheduler!AsyncScheduler;
341     alias S3 = MostDerivedScheduler!LocalScheduler;
342     alias S4 = MostDerivedScheduler!ThreadScheduler;
343     alias S5 = MostDerivedScheduler!TaskPoolScheduler;
344     alias S6 = MostDerivedScheduler!(HistoricalScheduler!ThreadScheduler);
345     alias S7 = MostDerivedScheduler!(HistoricalScheduler!TaskPoolScheduler);
346 
347     static assert(is(S1 == Scheduler));
348     static assert(is(S2 == AsyncScheduler));
349     static assert(is(S3 == Scheduler));
350     static assert(is(S4 == AsyncScheduler));
351     static assert(is(S5 == AsyncScheduler));
352     static assert(is(S6 == AsyncScheduler));
353     static assert(is(S7 == AsyncScheduler));
354 }
355 
356 ///
357 final class SchedulerObject(TScheduler) : MostDerivedScheduler!TScheduler
358 {
359 private:
360     TScheduler scheduler;
361 
362 public:
363     ///
364     this(TScheduler scheduler)
365     {
366         this.scheduler = scheduler;
367     }
368 
369     ///
370     this(ref TScheduler scheduler)
371     {
372         this.scheduler = scheduler;
373     }
374 
375     ///
376     void start(void delegate() op)
377     {
378         scheduler.start(op);
379     }
380 
381     static if (isAsyncScheduler!TScheduler)
382     {
383         ///
384         CancellationToken schedule(void delegate() op, Duration val)
385         {
386             return scheduler.schedule(op, val);
387         }
388     }
389 }
390 
391 ///
392 MostDerivedScheduler!TScheduler schedulerObject(TScheduler)(auto ref TScheduler scheduler)
393 {
394     static if (is(MostDerivedScheduler!TScheduler == AsyncScheduler))
395     {
396         static if (is(TScheduler : AsyncScheduler))
397             return scheduler;
398         else
399             return new SchedulerObject!TScheduler(scheduler);
400     }
401     else static if (is(MostDerivedScheduler!TScheduler == Scheduler))
402     {
403         static if (is(TScheduler : Scheduler))
404             return scheduler;
405         else
406             return new SchedulerObject!TScheduler(scheduler);
407     }
408     else
409         static assert(false);
410 }
411 
412 ///
413 unittest
414 {
415     struct MyScheduler
416     {
417         void start(void delegate() op)
418         {
419         }
420     }
421 
422     class MyClassScheduler
423     {
424         void start(void delegate() op)
425         {
426         }
427     }
428 
429     class MyClassDerivedScheduler : Scheduler
430     {
431         void start(void delegate() op)
432         {
433         }
434     }
435 
436     struct MyAsyncScheduler
437     {
438         void start(void delegate() op)
439         {
440         }
441 
442         CancellationToken schedule(void delegate() op, Duration val)
443         {
444             return null;
445         }
446     }
447 
448     class MyClassAsyncScheduler
449     {
450         void start(void delegate() op)
451         {
452         }
453 
454         CancellationToken schedule(void delegate() op, Duration val)
455         {
456             return null;
457         }
458     }
459     
460     class MyClassPartAsyncScheduler : Scheduler
461     {
462         void start(void delegate() op)
463         {
464         }
465 
466         CancellationToken schedule(void delegate() op, Duration val)
467         {
468             return null;
469         }
470     }
471 
472     class MyClassDerivedAsyncScheduler : AsyncScheduler
473     {
474         void start(void delegate() op)
475         {
476         }
477 
478         CancellationToken schedule(void delegate() op, Duration val)
479         {
480             return null;
481         }
482     }
483 
484     auto s1 = MyScheduler();
485     auto s2 = new MyClassScheduler;
486     auto s3 = new MyClassDerivedScheduler;
487     auto s4 = MyAsyncScheduler();
488     auto s5 = new MyClassAsyncScheduler;
489     auto s6 = new MyClassPartAsyncScheduler;
490     auto s7 = new MyClassDerivedAsyncScheduler;
491 
492     Scheduler t1 = s1.schedulerObject();
493     Scheduler t2 = s2.schedulerObject();
494     Scheduler t3 = s3.schedulerObject();
495     AsyncScheduler t4 = s4.schedulerObject();
496     AsyncScheduler t5 = s5.schedulerObject();
497     AsyncScheduler t6 = s6.schedulerObject();
498     AsyncScheduler t7 = s7.schedulerObject();
499 
500     assert(t1 !is null);
501     assert(t2 !is null);
502     assert(t3 !is null);
503     assert(t4 !is null);
504     assert(t5 !is null);
505     assert(t6 !is null);
506     assert(t7 !is null);
507 
508     assert(t3 is s3);
509     assert(t7 is s7);
510 }
511 
512 ///
513 unittest
514 {
515     struct MyScheduler
516     {
517         void start(void delegate() op)
518         {
519             op();
520         }
521     }
522 
523     MyScheduler scheduler;
524     Scheduler wrapped = scheduler.schedulerObject();
525     assert(wrapped !is null);
526 }
527 
528 ///
529 struct ObserveOnObserver(TObserver, TScheduler, E)
530 {
531 public:
532     static if (hasFailure!TObserver)
533     {
534         ///
535         this(TObserver observer, TScheduler scheduler, Disposable disposable)
536         {
537             _observer = observer;
538             _scheduler = scheduler;
539             _disposable = disposable;
540         }
541     }
542     else
543     {
544         ///
545         this(TObserver observer, TScheduler scheduler)
546         {
547             _observer = observer;
548             _scheduler = scheduler;
549         }
550     }
551 public:
552     ///
553     void put(E obj)
554     {
555         _scheduler.start({
556             static if (hasFailure!TObserver)
557             {
558                 try
559                 {
560                     _observer.put(obj);
561                 }
562                 catch (Exception e)
563                 {
564                     _observer.failure(e);
565                     _disposable.dispose();
566                 }
567             }
568             else
569             {
570                 _observer.put(obj);
571             }
572         });
573     }
574 
575     static if (hasCompleted!TObserver)
576     {
577         ///
578         void completed()
579         {
580             _scheduler.start({ _observer.completed(); });
581         }
582     }
583     static if (hasFailure!TObserver)
584     {
585         ///
586         void failure(Exception e)
587         {
588             _scheduler.start({ _observer.failure(e); });
589         }
590     }
591 private:
592     TObserver _observer;
593     TScheduler _scheduler;
594     static if (hasFailure!TObserver)
595     {
596         Disposable _disposable;
597     }
598 }
599 
600 ///
601 struct ObserveOnObservable(TObservable, TScheduler : Scheduler)
602 {
603     alias ElementType = TObservable.ElementType;
604 public:
605     ///
606     this(TObservable observable, TScheduler scheduler)
607     {
608         _observable = observable;
609         _scheduler = scheduler;
610     }
611 
612 public:
613     ///
614     auto subscribe(TObserver)(TObserver observer)
615     {
616         alias ObserverType = ObserveOnObserver!(TObserver, TScheduler, TObservable.ElementType);
617         static if (hasFailure!TObserver)
618         {
619             auto disposable = new SingleAssignmentDisposable;
620             disposable.setDisposable(disposableObject(doSubscribe(_observable,
621                     ObserverType(observer, _scheduler, disposable))));
622             return disposable;
623         }
624         else
625         {
626             return doSubscribe(_observable, ObserverType(observer, _scheduler));
627         }
628     }
629 
630 private:
631     TObservable _observable;
632     TScheduler _scheduler;
633 }
634 
635 unittest
636 {
637     alias TestObservable = ObserveOnObservable!(Observable!int, Scheduler);
638     static assert(isObservable!(TestObservable, int));
639 
640     import rx.subject : SubjectObject;
641 
642     auto sub = new SubjectObject!int;
643     auto scheduler = new LocalScheduler;
644 
645     auto scheduled = TestObservable(sub, scheduler);
646 
647     auto flag1 = false;
648     auto d = scheduled.subscribe((int n) { flag1 = true; });
649     scope (exit)
650         d.dispose();
651     .put(sub, 1);
652     assert(flag1);
653 
654     auto flag2 = false;
655     auto d2 = scheduled.doSubscribe((int n) { flag2 = true; });
656     scope (exit)
657         d2.dispose();
658     .put(sub, 2);
659     assert(flag2);
660 }
661 
662 ///
663 ObserveOnObservable!(TObservable, TScheduler) observeOn(TObservable, TScheduler : Scheduler)(
664         auto ref TObservable observable, TScheduler scheduler)
665 {
666     return typeof(return)(observable, scheduler);
667 }
668 
669 ///
670 unittest
671 {
672     import std.concurrency;
673     import rx.subject;
674 
675     auto subject = new SubjectObject!int;
676     auto scheduler = new LocalScheduler;
677     auto scheduled = subject.observeOn(scheduler);
678 
679     import std.array : appender;
680 
681     auto buf = appender!(int[]);
682     auto observer = observerObject!int(buf);
683 
684     auto d1 = scheduled.subscribe(buf);
685     auto d2 = scheduled.subscribe(observer);
686 
687     subject.put(0);
688     assert(buf.data.length == 2);
689 
690     subject.put(1);
691     assert(buf.data.length == 4);
692 }
693 
694 unittest
695 {
696     import std.concurrency;
697     import rx.subject;
698 
699     auto subject = new SubjectObject!int;
700     auto scheduler = new LocalScheduler;
701     auto scheduled = subject.observeOn(scheduler);
702 
703     struct ObserverA
704     {
705         void put(int n)
706         {
707         }
708     }
709 
710     struct ObserverB
711     {
712         void put(int n)
713         {
714         }
715 
716         void completed()
717         {
718         }
719     }
720 
721     struct ObserverC
722     {
723         void put(int n)
724         {
725         }
726 
727         void failure(Exception e)
728         {
729         }
730     }
731 
732     struct ObserverD
733     {
734         void put(int n)
735         {
736         }
737 
738         void completed()
739         {
740         }
741 
742         void failure(Exception e)
743         {
744         }
745     }
746 
747     scheduled.doSubscribe(ObserverA());
748     scheduled.doSubscribe(ObserverB());
749     scheduled.doSubscribe(ObserverC());
750     scheduled.doSubscribe(ObserverD());
751 
752     subject.put(1);
753     subject.completed();
754 }
755 
756 ///
757 class SubscribeOnObservable(TObservable, TScheduler : Scheduler)
758 {
759     alias ElementType = TObservable.ElementType;
760 
761 public:
762     ///
763     this(TObservable observable, TScheduler scheduler)
764     {
765         _observable = observable;
766         _scheduler = scheduler;
767     }
768 
769 public:
770     ///
771     auto subscribe(TObserver)(TObserver observer)
772     {
773         auto disposable = new SingleAssignmentDisposable;
774         _scheduler.start({
775             auto temp = doSubscribe(_observable, observer);
776             disposable.setDisposable(disposableObject(temp));
777         });
778         return disposable;
779     }
780 
781 private:
782     TObservable _observable;
783     TScheduler _scheduler;
784 }
785 
786 unittest
787 {
788     alias TestObservable = SubscribeOnObservable!(Observable!int, Scheduler);
789     static assert(isObservable!(TestObservable, int));
790 
791     import rx.subject : SubjectObject;
792 
793     auto sub = new SubjectObject!int;
794     auto scheduler = new LocalScheduler;
795 
796     auto scheduled = new TestObservable(sub, scheduler);
797 
798     auto flag1 = false;
799     auto d = scheduled.subscribe((int n) { flag1 = true; });
800     scope (exit)
801         d.dispose();
802     .put(sub, 1);
803     assert(flag1);
804 
805     auto flag2 = false;
806     auto d2 = scheduled.doSubscribe((int n) { flag2 = true; });
807     scope (exit)
808         d2.dispose();
809     .put(sub, 2);
810     assert(flag2);
811 }
812 
813 ///
814 SubscribeOnObservable!(TObservable, TScheduler) subscribeOn(TObservable, TScheduler : Scheduler)(
815         auto ref TObservable observable, auto ref TScheduler scheduler)
816 {
817     return new typeof(return)(observable, scheduler);
818 }
819 ///
820 unittest
821 {
822     import rx.observable : defer;
823 
824     auto sub = defer!int((Observer!int observer) {
825         .put(observer, 100);
826         return NopDisposable.instance;
827     });
828     auto scheduler = new LocalScheduler;
829 
830     auto scheduled = sub.subscribeOn(scheduler);
831 
832     int value = 0;
833     auto d = scheduled.doSubscribe((int n) { value = n; });
834     scope (exit)
835         d.dispose();
836 
837     assert(value == 100);
838 }
839 ///
840 unittest
841 {
842     import rx.observable : defer;
843     import rx.util : EventSignal;
844 
845     auto sub = defer!int((Observer!int observer) {
846         .put(observer, 100);
847         return NopDisposable.instance;
848     });
849     auto scheduler = new TaskPoolScheduler;
850     auto scheduled = sub.subscribeOn(scheduler);
851 
852     int value = 0;
853     auto signal = new EventSignal;
854     auto d = scheduled.subscribe((int n) { value = n; signal.setSignal(); });
855     scope (exit)
856         d.dispose();
857 
858     signal.wait();
859     assert(value == 100);
860 }
861 
862 unittest
863 {
864     import std.algorithm : equal;
865     import std.array : Appender;
866     import rx.util : EventSignal;
867 
868     auto buf = Appender!(int[])();
869     auto data = [1, 2, 3, 4];
870 
871     auto event = new EventSignal;
872     auto observer = (int n) {
873         buf.put(n);
874         if (n == 4)
875             event.setSignal();
876     };
877     data.asObservable().subscribeOn(new ThreadScheduler).subscribe(observer);
878 
879     event.wait();
880 
881     assert(equal(buf.data, data));
882 }
883 
884 unittest
885 {
886     import std.algorithm : equal;
887     import std.array : Appender;
888     import rx.util : EventSignal;
889 
890     auto buf = Appender!(int[])();
891     auto data = [1, 2, 3, 4];
892 
893     auto event = new EventSignal;
894     auto observer = (int n) {
895         buf.put(n);
896         if (n == 4)
897             event.setSignal();
898     };
899     data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe(observer);
900 
901     event.wait();
902 
903     assert(equal(buf.data, data));
904 }
905 
906 unittest
907 {
908     import rx.util : EventSignal;
909 
910     auto data = [1, 2, 3, 4];
911     auto event = new EventSignal();
912 
913     data.asObservable().subscribeOn(new ThreadScheduler).subscribe((int a) {
914         if (a == 4)
915             event.setSignal();
916     });
917 
918     event.wait();
919 }
920 
921 unittest
922 {
923     import rx.util : EventSignal;
924 
925     auto data = [1, 2, 3, 4];
926     auto event = new EventSignal();
927 
928     data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe((int a) {
929         if (a == 4)
930             event.setSignal();
931     });
932 
933     event.wait();
934 }
935 
936 unittest
937 {
938     import core.atomic;
939     import core.sync.condition;
940     import std.typetuple;
941     import rx.util : EventSignal;
942 
943     enum N = 4;
944 
945     void test(Scheduler scheduler)
946     {
947         auto signal = new EventSignal;
948         shared count = 0;
949         foreach (n; 0 .. N)
950         {
951             scheduler.start(() {
952                 atomicOp!"+="(count, 1);
953                 Thread.sleep(dur!"msecs"(50));
954                 if (atomicLoad(count) == N)
955                     signal.setSignal();
956             });
957         }
958         signal.wait();
959         assert(count == N);
960     }
961 
962     test(new LocalScheduler);
963     test(new ThreadScheduler);
964     test(new TaskPoolScheduler);
965     test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler));
966     test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler));
967 }
968 
969 private __gshared Scheduler s_scheduler;
970 shared static this()
971 {
972     s_scheduler = new TaskPoolScheduler;
973 }
974 
975 ///
976 Scheduler currentScheduler() @property
977 {
978     return s_scheduler;
979 }
980 
981 ///
982 TScheduler currentScheduler(TScheduler : Scheduler)(TScheduler scheduler) @property
983 {
984     s_scheduler = scheduler;
985     return scheduler;
986 }
987 
988 unittest
989 {
990     Scheduler s = currentScheduler;
991     scope (exit)
992         currentScheduler = s;
993 
994     TaskPoolScheduler s1 = new TaskPoolScheduler;
995     TaskPoolScheduler s2 = currentScheduler = s1;
996     assert(s2 is s1);
997 }