1 /++
2  + This module defines the concept of Scheduler.
3  +/
4 module rx.scheduler;
5 
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 
10 import core.time;
11 import core.thread : Thread;
12 import std.range : put;
13 import std.parallelism : TaskPool, taskPool, task;
14 
15 //Note:
16 // In single core, taskPool's worker are not initialized.
17 // Some asynchronous algorithm does not work as expected, so at least 1 thread is reserved.
18 version (Disable_ReservePoolThreadsOnSingleCore)
19 {
20 }
21 else
22 {
23     shared static this()
24     {
25         import std.parallelism : defaultPoolThreads;
26 
27         if (defaultPoolThreads == 0)
28             defaultPoolThreads = 1;
29     }
30 }
31 
32 ///
33 interface Scheduler
34 {
35     void start(void delegate() op);
36 }
37 ///
38 interface AsyncScheduler : Scheduler
39 {
40     CancellationToken schedule(void delegate() op, Duration val);
41 }
42 
43 ///
44 class LocalScheduler : Scheduler
45 {
46 public:
47     void start(void delegate() op)
48     {
49         op();
50     }
51 }
52 ///
53 class ThreadScheduler : AsyncScheduler
54 {
55     void start(void delegate() op)
56     {
57         auto t = new Thread(op);
58         t.start();
59     }
60 
61     CancellationToken schedule(void delegate() op, Duration val)
62     {
63         auto target = MonoTime.currTime + val;
64         auto c = new CancellationToken;
65         start({
66             if (c.isCanceled)
67                 return;
68             auto dt = target - MonoTime.currTime;
69             if (dt > Duration.zero)
70                 Thread.sleep(dt);
71             if (!c.isCanceled)
72                 op();
73         });
74         return c;
75     }
76 }
77 
78 unittest
79 {
80     import std.stdio : writeln;
81 
82     writeln("Testing ThreadScheduler...");
83     scope (exit)
84         writeln("ThreadScheduler test is completed.");
85 
86     import rx.util : EventSignal;
87 
88     auto scheduler = new ThreadScheduler;
89     auto signal = new EventSignal;
90     auto done = false;
91     auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs);
92 
93     signal.wait();
94     assert(done);
95     assert(!c.isCanceled);
96 }
97 ///
98 class TaskPoolScheduler : AsyncScheduler
99 {
100 public:
101     this(TaskPool pool = null)
102     {
103         if (pool is null)
104             pool = taskPool;
105 
106         _pool = pool;
107     }
108 
109 public:
110     void start(void delegate() op)
111     {
112         _pool.put(task(op));
113     }
114 
115     CancellationToken schedule(void delegate() op, Duration val)
116     {
117         auto target = MonoTime.currTime + val;
118         auto c = new CancellationToken;
119         start({
120             if (c.isCanceled)
121                 return;
122             auto dt = target - MonoTime.currTime;
123             if (dt > Duration.zero)
124                 Thread.sleep(dt);
125             if (!c.isCanceled)
126                 op();
127         });
128         return c;
129     }
130 
131 private:
132     TaskPool _pool;
133 }
134 
135 unittest
136 {
137     import std.stdio : writeln;
138     import std.parallelism : totalCPUs, defaultPoolThreads;
139 
140     writeln("Testing TaskPoolScheduler...");
141     scope (exit)
142         writeln("TaskPoolScheduler test is completed.");
143 
144     version (OSX)
145     {
146         writeln("totalCPUs: ", totalCPUs);
147         writeln("defaultPoolThreads: ", defaultPoolThreads);
148     }
149 
150     import rx.util : EventSignal;
151 
152     auto scheduler = new TaskPoolScheduler;
153     auto signal = new EventSignal;
154     auto done = false;
155     auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs);
156 
157     signal.wait();
158     assert(done);
159     assert(!c.isCanceled);
160 }
161 
162 ///
163 class HistoricalScheduler(T) : AsyncScheduler
164 {
165     static assert(is(T : AsyncScheduler));
166 
167 public:
168     this(T innerScheduler)
169     {
170         _offset = Duration.zero;
171         _innerScheduler = innerScheduler;
172     }
173 
174 public:
175     void start(void delegate() op)
176     {
177         _innerScheduler.start(op);
178     }
179 
180     CancellationToken schedule(void delegate() op, Duration val)
181     {
182         return _innerScheduler.schedule(op, val - _offset);
183     }
184 
185     void roll(Duration val)
186     {
187         _offset += val;
188     }
189 
190 private:
191     T _innerScheduler;
192     Duration _offset;
193 }
194 ///
195 HistoricalScheduler!TScheduler historicalScheduler(TScheduler)(auto ref TScheduler scheduler)
196 {
197     return new typeof(return)(scheduler);
198 }
199 
200 unittest
201 {
202     import std.stdio : writeln;
203 
204     writeln("Testing HistoricalScheduler...");
205     scope (exit)
206         writeln("HistoricalScheduler test is completed.");
207 
208     void test(AsyncScheduler scheduler)
209     {
210         import rx.util : EventSignal;
211 
212         bool done = false;
213         auto signal = new EventSignal;
214 
215         auto c = scheduler.schedule(() { done = true; signal.setSignal(); }, dur!"msecs"(100));
216         assert(!done);
217 
218         signal.wait();
219         assert(done);
220         assert(!c.isCanceled);
221     }
222 
223     //test(new ThreadScheduler);
224     //test(new TaskPoolScheduler);
225     test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler));
226     test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler));
227 }
228 
229 unittest
230 {
231     void test(AsyncScheduler scheduler)
232     {
233         bool done = false;
234         auto c = scheduler.schedule(() { done = true; }, dur!"msecs"(50));
235         c.cancel();
236         Thread.sleep(dur!"msecs"(100));
237         assert(!done);
238     }
239 
240     test(new ThreadScheduler);
241     test(new TaskPoolScheduler);
242     test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler));
243     test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler));
244 }
245 
246 unittest
247 {
248     import std.typetuple : TypeTuple;
249 
250     foreach (T; TypeTuple!(ThreadScheduler, TaskPoolScheduler))
251     {
252         auto scheduler = historicalScheduler(new T);
253 
254         scheduler.roll(dur!"seconds"(20));
255 
256         auto done = false;
257         auto c = scheduler.schedule(() { done = true; }, dur!"seconds"(10));
258         Thread.sleep(dur!"msecs"(10)); // wait for a context switch
259         assert(done);
260     }
261 }
262 
263 unittest
264 {
265     static assert(!__traits(compiles, { HistoricalScheduler!LocalScheduler s; }));
266 }
267 
268 ///
269 struct ObserveOnObserver(TObserver, TScheduler, E)
270 {
271 public:
272     static if (hasFailure!TObserver)
273     {
274         this(TObserver observer, TScheduler scheduler, Disposable disposable)
275         {
276             _observer = observer;
277             _scheduler = scheduler;
278             _disposable = disposable;
279         }
280     }
281     else
282     {
283         this(TObserver observer, TScheduler scheduler)
284         {
285             _observer = observer;
286             _scheduler = scheduler;
287         }
288     }
289 public:
290     void put(E obj)
291     {
292         _scheduler.start({
293             static if (hasFailure!TObserver)
294             {
295                 try
296                 {
297                     _observer.put(obj);
298                 }
299                 catch (Exception e)
300                 {
301                     _observer.failure(e);
302                     _disposable.dispose();
303                 }
304             }
305             else
306             {
307                 _observer.put(obj);
308             }
309         });
310     }
311 
312     static if (hasCompleted!TObserver)
313     {
314         void completed()
315         {
316             _scheduler.start({ _observer.completed(); });
317         }
318     }
319     static if (hasFailure!TObserver)
320     {
321         void failure(Exception e)
322         {
323             _scheduler.start({ _observer.failure(e); });
324         }
325     }
326 private:
327     TObserver _observer;
328     TScheduler _scheduler;
329     static if (hasFailure!TObserver)
330     {
331         Disposable _disposable;
332     }
333 }
334 
335 ///
336 struct ObserveOnObservable(TObservable, TScheduler : Scheduler)
337 {
338     alias ElementType = TObservable.ElementType;
339 public:
340     this(TObservable observable, TScheduler scheduler)
341     {
342         _observable = observable;
343         _scheduler = scheduler;
344     }
345 
346 public:
347     auto subscribe(TObserver)(TObserver observer)
348     {
349         alias ObserverType = ObserveOnObserver!(TObserver, TScheduler, TObservable.ElementType);
350         static if (hasFailure!TObserver)
351         {
352             auto disposable = new SingleAssignmentDisposable;
353             disposable.setDisposable(disposableObject(doSubscribe(_observable,
354                     ObserverType(observer, _scheduler, disposable))));
355             return disposable;
356         }
357         else
358         {
359             return doSubscribe(_observable, ObserverType(observer, _scheduler));
360         }
361     }
362 
363 private:
364     TObservable _observable;
365     TScheduler _scheduler;
366 }
367 
368 unittest
369 {
370     alias TestObservable = ObserveOnObservable!(Observable!int, Scheduler);
371     static assert(isObservable!(TestObservable, int));
372 
373     import rx.subject : SubjectObject;
374 
375     auto sub = new SubjectObject!int;
376     auto scheduler = new LocalScheduler;
377 
378     auto scheduled = TestObservable(sub, scheduler);
379 
380     auto flag1 = false;
381     auto d = scheduled.subscribe((int n) { flag1 = true; });
382     scope (exit)
383         d.dispose();
384     .put(sub, 1);
385     assert(flag1);
386 
387     auto flag2 = false;
388     auto d2 = scheduled.doSubscribe((int n) { flag2 = true; });
389     scope (exit)
390         d2.dispose();
391     .put(sub, 2);
392     assert(flag2);
393 }
394 
395 ///
396 ObserveOnObservable!(TObservable, TScheduler) observeOn(TObservable, TScheduler : Scheduler)(
397         auto ref TObservable observable, TScheduler scheduler)
398 {
399     return typeof(return)(observable, scheduler);
400 }
401 
402 ///
403 unittest
404 {
405     import std.concurrency;
406     import rx.subject;
407 
408     auto subject = new SubjectObject!int;
409     auto scheduler = new LocalScheduler;
410     auto scheduled = subject.observeOn(scheduler);
411 
412     import std.array : appender;
413 
414     auto buf = appender!(int[]);
415     auto observer = observerObject!int(buf);
416 
417     auto d1 = scheduled.subscribe(buf);
418     auto d2 = scheduled.subscribe(observer);
419 
420     subject.put(0);
421     assert(buf.data.length == 2);
422 
423     subject.put(1);
424     assert(buf.data.length == 4);
425 }
426 
427 unittest
428 {
429     import std.concurrency;
430     import rx.subject;
431 
432     auto subject = new SubjectObject!int;
433     auto scheduler = new LocalScheduler;
434     auto scheduled = subject.observeOn(scheduler);
435 
436     struct ObserverA
437     {
438         void put(int n)
439         {
440         }
441     }
442 
443     struct ObserverB
444     {
445         void put(int n)
446         {
447         }
448 
449         void completed()
450         {
451         }
452     }
453 
454     struct ObserverC
455     {
456         void put(int n)
457         {
458         }
459 
460         void failure(Exception e)
461         {
462         }
463     }
464 
465     struct ObserverD
466     {
467         void put(int n)
468         {
469         }
470 
471         void completed()
472         {
473         }
474 
475         void failure(Exception e)
476         {
477         }
478     }
479 
480     scheduled.doSubscribe(ObserverA());
481     scheduled.doSubscribe(ObserverB());
482     scheduled.doSubscribe(ObserverC());
483     scheduled.doSubscribe(ObserverD());
484 
485     subject.put(1);
486     subject.completed();
487 }
488 
489 ///
490 class SubscribeOnObservable(TObservable, TScheduler : Scheduler)
491 {
492     alias ElementType = TObservable.ElementType;
493 
494 public:
495     this(TObservable observable, TScheduler scheduler)
496     {
497         _observable = observable;
498         _scheduler = scheduler;
499     }
500 
501 public:
502     auto subscribe(TObserver)(TObserver observer)
503     {
504         auto disposable = new SingleAssignmentDisposable;
505         _scheduler.start({
506             auto temp = doSubscribe(_observable, observer);
507             disposable.setDisposable(disposableObject(temp));
508         });
509         return disposable;
510     }
511 
512 private:
513     TObservable _observable;
514     TScheduler _scheduler;
515 }
516 
517 unittest
518 {
519     alias TestObservable = SubscribeOnObservable!(Observable!int, Scheduler);
520     static assert(isObservable!(TestObservable, int));
521 
522     import rx.subject : SubjectObject;
523 
524     auto sub = new SubjectObject!int;
525     auto scheduler = new LocalScheduler;
526 
527     auto scheduled = new TestObservable(sub, scheduler);
528 
529     auto flag1 = false;
530     auto d = scheduled.subscribe((int n) { flag1 = true; });
531     scope (exit)
532         d.dispose();
533     .put(sub, 1);
534     assert(flag1);
535 
536     auto flag2 = false;
537     auto d2 = scheduled.doSubscribe((int n) { flag2 = true; });
538     scope (exit)
539         d2.dispose();
540     .put(sub, 2);
541     assert(flag2);
542 }
543 
544 ///
545 SubscribeOnObservable!(TObservable, TScheduler) subscribeOn(TObservable, TScheduler : Scheduler)(
546         auto ref TObservable observable, auto ref TScheduler scheduler)
547 {
548     return new typeof(return)(observable, scheduler);
549 }
550 ///
551 unittest
552 {
553     import rx.observable : defer;
554 
555     auto sub = defer!int((Observer!int observer) {
556         .put(observer, 100);
557         return NopDisposable.instance;
558     });
559     auto scheduler = new LocalScheduler;
560 
561     auto scheduled = sub.subscribeOn(scheduler);
562 
563     int value = 0;
564     auto d = scheduled.doSubscribe((int n) { value = n; });
565     scope (exit)
566         d.dispose();
567 
568     assert(value == 100);
569 }
570 ///
571 unittest
572 {
573     import rx.observable : defer;
574     import rx.util : EventSignal;
575 
576     auto sub = defer!int((Observer!int observer) {
577         .put(observer, 100);
578         return NopDisposable.instance;
579     });
580     auto scheduler = new TaskPoolScheduler;
581     auto scheduled = sub.subscribeOn(scheduler);
582 
583     int value = 0;
584     auto signal = new EventSignal;
585     auto d = scheduled.subscribe((int n) { value = n; signal.setSignal(); });
586     scope (exit)
587         d.dispose();
588 
589     signal.wait();
590     assert(value == 100);
591 }
592 
593 unittest
594 {
595     import std.algorithm : equal;
596     import std.array : Appender;
597     import rx.util : EventSignal;
598 
599     auto buf = Appender!(int[])();
600     auto data = [1, 2, 3, 4];
601 
602     auto event = new EventSignal;
603     auto observer = (int n) {
604         buf.put(n);
605         if (n == 4)
606             event.setSignal();
607     };
608     data.asObservable().subscribeOn(new ThreadScheduler).subscribe(observer);
609 
610     event.wait();
611 
612     assert(equal(buf.data, data));
613 }
614 
615 unittest
616 {
617     import std.algorithm : equal;
618     import std.array : Appender;
619     import rx.util : EventSignal;
620 
621     auto buf = Appender!(int[])();
622     auto data = [1, 2, 3, 4];
623 
624     auto event = new EventSignal;
625     auto observer = (int n) {
626         buf.put(n);
627         if (n == 4)
628             event.setSignal();
629     };
630     data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe(observer);
631 
632     event.wait();
633 
634     assert(equal(buf.data, data));
635 }
636 
637 unittest
638 {
639     import rx.util : EventSignal;
640 
641     auto data = [1, 2, 3, 4];
642     auto event = new EventSignal();
643 
644     data.asObservable().subscribeOn(new ThreadScheduler).subscribe((int a) {
645         if (a == 4)
646             event.setSignal();
647     });
648 
649     event.wait();
650 }
651 
652 unittest
653 {
654     import rx.util : EventSignal;
655 
656     auto data = [1, 2, 3, 4];
657     auto event = new EventSignal();
658 
659     data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe((int a) {
660         if (a == 4)
661             event.setSignal();
662     });
663 
664     event.wait();
665 }
666 
667 unittest
668 {
669     import core.atomic;
670     import core.sync.condition;
671     import std.typetuple;
672     import rx.util : EventSignal;
673 
674     enum N = 4;
675 
676     void test(Scheduler scheduler)
677     {
678         auto signal = new EventSignal;
679         shared count = 0;
680         foreach (n; 0 .. N)
681         {
682             scheduler.start(() {
683                 atomicOp!"+="(count, 1);
684                 Thread.sleep(dur!"msecs"(50));
685                 if (atomicLoad(count) == N)
686                     signal.setSignal();
687             });
688         }
689         signal.wait();
690         assert(count == N);
691     }
692 
693     test(new LocalScheduler);
694     test(new ThreadScheduler);
695     test(new TaskPoolScheduler);
696     test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler));
697     test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler));
698 }
699 
700 private __gshared Scheduler s_scheduler;
701 shared static this()
702 {
703     s_scheduler = new TaskPoolScheduler;
704 }
705 
706 ///
707 Scheduler currentScheduler() @property
708 {
709     return s_scheduler;
710 }
711 
712 ///
713 TScheduler currentScheduler(TScheduler : Scheduler)(TScheduler scheduler) @property
714 {
715     s_scheduler = scheduler;
716     return scheduler;
717 }
718 
719 unittest
720 {
721     Scheduler s = currentScheduler;
722     scope (exit)
723         currentScheduler = s;
724 
725     TaskPoolScheduler s1 = new TaskPoolScheduler;
726     TaskPoolScheduler s2 = currentScheduler = s1;
727     assert(s2 is s1);
728 }