1 module rx.scheduler;
2 
3 import rx.disposable;
4 import rx.observer;
5 import rx.observable;
6 
7 import core.time;
8 import core.thread : Thread;
9 import std.range : put;
10 import std.parallelism : TaskPool, taskPool, task;
11 
12 //Note:
13 // In single core, taskPool's worker are not initialized.
14 // Some asynchronous algorithm does not work as expected, so at least 1 thread is reserved.
15 version (Disable_ReservePoolThreadsOnSingleCore)
16 {
17 }
18 else
19 {
20     shared static this()
21     {
22         import std.parallelism : defaultPoolThreads;
23 
24         if (defaultPoolThreads == 0)
25             defaultPoolThreads = 1;
26     }
27 }
28 
29 ///
30 interface Scheduler
31 {
32     void start(void delegate() op);
33 }
34 ///
35 interface AsyncScheduler : Scheduler
36 {
37     CancellationToken schedule(void delegate() op, Duration val);
38 }
39 
40 ///
41 class LocalScheduler : Scheduler
42 {
43 public:
44     void start(void delegate() op)
45     {
46         op();
47     }
48 }
49 ///
50 class ThreadScheduler : AsyncScheduler
51 {
52     void start(void delegate() op)
53     {
54         auto t = new Thread(op);
55         t.start();
56     }
57 
58     CancellationToken schedule(void delegate() op, Duration val)
59     {
60         auto target = MonoTime.currTime + val;
61         auto c = new CancellationToken;
62         start({
63             if (c.isCanceled)
64                 return;
65             auto dt = target - MonoTime.currTime;
66             if (dt > Duration.zero)
67                 Thread.sleep(dt);
68             if (!c.isCanceled)
69                 op();
70         });
71         return c;
72     }
73 }
74 
75 unittest
76 {
77     import std.stdio : writeln;
78 
79     writeln("Testing ThreadScheduler...");
80     scope (exit)
81         writeln("ThreadScheduler test is completed.");
82 
83     import rx.util : EventSignal;
84 
85     auto scheduler = new ThreadScheduler;
86     auto signal = new EventSignal;
87     auto done = false;
88     auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs);
89 
90     signal.wait();
91     assert(done);
92     assert(!c.isCanceled);
93 }
94 ///
95 class TaskPoolScheduler : AsyncScheduler
96 {
97 public:
98     this(TaskPool pool = null)
99     {
100         if (pool is null)
101             pool = taskPool;
102 
103         _pool = pool;
104     }
105 
106 public:
107     void start(void delegate() op)
108     {
109         _pool.put(task(op));
110     }
111 
112     CancellationToken schedule(void delegate() op, Duration val)
113     {
114         auto target = MonoTime.currTime + val;
115         auto c = new CancellationToken;
116         start({
117             if (c.isCanceled)
118                 return;
119             auto dt = target - MonoTime.currTime;
120             if (dt > Duration.zero)
121                 Thread.sleep(dt);
122             if (!c.isCanceled)
123                 op();
124         });
125         return c;
126     }
127 
128 private:
129     TaskPool _pool;
130 }
131 
132 unittest
133 {
134     import std.stdio : writeln;
135     import std.parallelism : totalCPUs, defaultPoolThreads;
136 
137     writeln("Testing TaskPoolScheduler...");
138     scope (exit)
139         writeln("TaskPoolScheduler test is completed.");
140 
141     version (OSX)
142     {
143         writeln("totalCPUs: ", totalCPUs);
144         writeln("defaultPoolThreads: ", defaultPoolThreads);
145     }
146 
147     import rx.util : EventSignal;
148 
149     auto scheduler = new TaskPoolScheduler;
150     auto signal = new EventSignal;
151     auto done = false;
152     auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs);
153 
154     signal.wait();
155     assert(done);
156     assert(!c.isCanceled);
157 }
158 
159 ///
160 class HistoricalScheduler(T) : AsyncScheduler
161 {
162     static assert(is(T : AsyncScheduler));
163 
164 public:
165     this(T innerScheduler)
166     {
167         _offset = Duration.zero;
168         _innerScheduler = innerScheduler;
169     }
170 
171 public:
172     void start(void delegate() op)
173     {
174         _innerScheduler.start(op);
175     }
176 
177     CancellationToken schedule(void delegate() op, Duration val)
178     {
179         return _innerScheduler.schedule(op, val - _offset);
180     }
181 
182     void roll(Duration val)
183     {
184         _offset += val;
185     }
186 
187 private:
188     T _innerScheduler;
189     Duration _offset;
190 }
191 ///
192 HistoricalScheduler!TScheduler historicalScheduler(TScheduler)(auto ref TScheduler scheduler)
193 {
194     return new typeof(return)(scheduler);
195 }
196 
197 unittest
198 {
199     import std.stdio : writeln;
200 
201     writeln("Testing HistoricalScheduler...");
202     scope (exit)
203         writeln("HistoricalScheduler test is completed.");
204 
205     void test(AsyncScheduler scheduler)
206     {
207         import rx.util : EventSignal;
208 
209         bool done = false;
210         auto signal = new EventSignal;
211 
212         auto c = scheduler.schedule(() { done = true; signal.setSignal(); }, dur!"msecs"(100));
213         assert(!done);
214 
215         signal.wait();
216         assert(done);
217         assert(!c.isCanceled);
218     }
219 
220     //test(new ThreadScheduler);
221     //test(new TaskPoolScheduler);
222     test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler));
223     test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler));
224 }
225 
226 unittest
227 {
228     void test(AsyncScheduler scheduler)
229     {
230         bool done = false;
231         auto c = scheduler.schedule(() { done = true; }, dur!"msecs"(50));
232         c.cancel();
233         Thread.sleep(dur!"msecs"(100));
234         assert(!done);
235     }
236 
237     test(new ThreadScheduler);
238     test(new TaskPoolScheduler);
239     test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler));
240     test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler));
241 }
242 
243 unittest
244 {
245     import std.typetuple : TypeTuple;
246 
247     foreach (T; TypeTuple!(ThreadScheduler, TaskPoolScheduler))
248     {
249         auto scheduler = historicalScheduler(new T);
250 
251         scheduler.roll(dur!"seconds"(20));
252 
253         auto done = false;
254         auto c = scheduler.schedule(() { done = true; }, dur!"seconds"(10));
255         Thread.sleep(dur!"msecs"(10)); // wait for a context switch
256         assert(done);
257     }
258 }
259 
260 unittest
261 {
262     static assert(!__traits(compiles, { HistoricalScheduler!LocalScheduler s; }));
263 }
264 
265 struct ObserveOnObserver(TObserver, TScheduler, E)
266 {
267 public:
268     static if (hasFailure!TObserver)
269     {
270         this(TObserver observer, TScheduler scheduler, Disposable disposable)
271         {
272             _observer = observer;
273             _scheduler = scheduler;
274             _disposable = disposable;
275         }
276     }
277     else
278     {
279         this(TObserver observer, TScheduler scheduler)
280         {
281             _observer = observer;
282             _scheduler = scheduler;
283         }
284     }
285 public:
286     void put(E obj)
287     {
288         _scheduler.start({
289             static if (hasFailure!TObserver)
290             {
291                 try
292                 {
293                     _observer.put(obj);
294                 }
295                 catch (Exception e)
296                 {
297                     _observer.failure(e);
298                     _disposable.dispose();
299                 }
300             }
301             else
302             {
303                 _observer.put(obj);
304             }
305         });
306     }
307 
308     static if (hasCompleted!TObserver)
309     {
310         void completed()
311         {
312             _scheduler.start({ _observer.completed(); });
313         }
314     }
315     static if (hasFailure!TObserver)
316     {
317         void failure(Exception e)
318         {
319             _scheduler.start({ _observer.failure(e); });
320         }
321     }
322 private:
323     TObserver _observer;
324     TScheduler _scheduler;
325     static if (hasFailure!TObserver)
326     {
327         Disposable _disposable;
328     }
329 }
330 
331 struct ObserveOnObservable(TObservable, TScheduler : Scheduler)
332 {
333     alias ElementType = TObservable.ElementType;
334 public:
335     this(TObservable observable, TScheduler scheduler)
336     {
337         _observable = observable;
338         _scheduler = scheduler;
339     }
340 
341 public:
342     auto subscribe(TObserver)(TObserver observer)
343     {
344         alias ObserverType = ObserveOnObserver!(TObserver, TScheduler, TObservable.ElementType);
345         static if (hasFailure!TObserver)
346         {
347             auto disposable = new SingleAssignmentDisposable;
348             disposable.setDisposable(disposableObject(doSubscribe(_observable,
349                     ObserverType(observer, _scheduler, disposable))));
350             return disposable;
351         }
352         else
353         {
354             return doSubscribe(_observable, ObserverType(observer, _scheduler));
355         }
356     }
357 
358 private:
359     TObservable _observable;
360     TScheduler _scheduler;
361 }
362 
363 unittest
364 {
365     alias TestObservable = ObserveOnObservable!(Observable!int, Scheduler);
366     static assert(isObservable!(TestObservable, int));
367 
368     import rx.subject : SubjectObject;
369 
370     auto sub = new SubjectObject!int;
371     auto scheduler = new LocalScheduler;
372 
373     auto scheduled = TestObservable(sub, scheduler);
374 
375     auto flag1 = false;
376     auto d = scheduled.subscribe((int n) { flag1 = true; });
377     scope (exit)
378         d.dispose();
379     .put(sub, 1);
380     assert(flag1);
381 
382     auto flag2 = false;
383     auto d2 = scheduled.doSubscribe((int n) { flag2 = true; });
384     scope (exit)
385         d2.dispose();
386     .put(sub, 2);
387     assert(flag2);
388 }
389 
390 ObserveOnObservable!(TObservable, TScheduler) observeOn(TObservable, TScheduler : Scheduler)(
391         auto ref TObservable observable, TScheduler scheduler)
392 {
393     return typeof(return)(observable, scheduler);
394 }
395 
396 unittest
397 {
398     import std.concurrency;
399     import rx.subject;
400 
401     auto subject = new SubjectObject!int;
402     auto scheduler = new LocalScheduler;
403     auto scheduled = subject.observeOn(scheduler);
404 
405     import std.array : appender;
406 
407     auto buf = appender!(int[]);
408     auto observer = observerObject!int(buf);
409 
410     auto d1 = scheduled.subscribe(buf);
411     auto d2 = scheduled.subscribe(observer);
412 
413     subject.put(0);
414     assert(buf.data.length == 2);
415 
416     subject.put(1);
417     assert(buf.data.length == 4);
418 }
419 
420 unittest
421 {
422     import std.concurrency;
423     import rx.subject;
424 
425     auto subject = new SubjectObject!int;
426     auto scheduler = new LocalScheduler;
427     auto scheduled = subject.observeOn(scheduler);
428 
429     struct ObserverA
430     {
431         void put(int n)
432         {
433         }
434     }
435 
436     struct ObserverB
437     {
438         void put(int n)
439         {
440         }
441 
442         void completed()
443         {
444         }
445     }
446 
447     struct ObserverC
448     {
449         void put(int n)
450         {
451         }
452 
453         void failure(Exception e)
454         {
455         }
456     }
457 
458     struct ObserverD
459     {
460         void put(int n)
461         {
462         }
463 
464         void completed()
465         {
466         }
467 
468         void failure(Exception e)
469         {
470         }
471     }
472 
473     scheduled.doSubscribe(ObserverA());
474     scheduled.doSubscribe(ObserverB());
475     scheduled.doSubscribe(ObserverC());
476     scheduled.doSubscribe(ObserverD());
477 
478     subject.put(1);
479     subject.completed();
480 }
481 
482 ///
483 class SubscribeOnObservable(TObservable, TScheduler : Scheduler)
484 {
485     alias ElementType = TObservable.ElementType;
486 
487 public:
488     this(TObservable observable, TScheduler scheduler)
489     {
490         _observable = observable;
491         _scheduler = scheduler;
492     }
493 
494 public:
495     auto subscribe(TObserver)(TObserver observer)
496     {
497         auto disposable = new SingleAssignmentDisposable;
498         _scheduler.start({
499             auto temp = doSubscribe(_observable, observer);
500             disposable.setDisposable(disposableObject(temp));
501         });
502         return disposable;
503     }
504 
505 private:
506     TObservable _observable;
507     TScheduler _scheduler;
508 }
509 
510 unittest
511 {
512     alias TestObservable = SubscribeOnObservable!(Observable!int, Scheduler);
513     static assert(isObservable!(TestObservable, int));
514 
515     import rx.subject : SubjectObject;
516 
517     auto sub = new SubjectObject!int;
518     auto scheduler = new LocalScheduler;
519 
520     auto scheduled = new TestObservable(sub, scheduler);
521 
522     auto flag1 = false;
523     auto d = scheduled.subscribe((int n) { flag1 = true; });
524     scope (exit)
525         d.dispose();
526     .put(sub, 1);
527     assert(flag1);
528 
529     auto flag2 = false;
530     auto d2 = scheduled.doSubscribe((int n) { flag2 = true; });
531     scope (exit)
532         d2.dispose();
533     .put(sub, 2);
534     assert(flag2);
535 }
536 
537 ///
538 SubscribeOnObservable!(TObservable, TScheduler) subscribeOn(TObservable, TScheduler : Scheduler)(
539         auto ref TObservable observable, auto ref TScheduler scheduler)
540 {
541     return new typeof(return)(observable, scheduler);
542 }
543 ///
544 unittest
545 {
546     import rx.observable : defer;
547 
548     auto sub = defer!int((Observer!int observer) {
549         .put(observer, 100);
550         return NopDisposable.instance;
551     });
552     auto scheduler = new LocalScheduler;
553 
554     auto scheduled = sub.subscribeOn(scheduler);
555 
556     int value = 0;
557     auto d = scheduled.doSubscribe((int n) { value = n; });
558     scope (exit)
559         d.dispose();
560 
561     assert(value == 100);
562 }
563 ///
564 unittest
565 {
566     import rx.observable : defer;
567     import rx.util : EventSignal;
568 
569     auto sub = defer!int((Observer!int observer) {
570         .put(observer, 100);
571         return NopDisposable.instance;
572     });
573     auto scheduler = new TaskPoolScheduler;
574     auto scheduled = sub.subscribeOn(scheduler);
575 
576     int value = 0;
577     auto signal = new EventSignal;
578     auto d = scheduled.subscribe((int n) { value = n; signal.setSignal(); });
579     scope (exit)
580         d.dispose();
581 
582     signal.wait();
583     assert(value == 100);
584 }
585 
586 unittest
587 {
588     import std.algorithm : equal;
589     import std.array : Appender;
590     import rx.util : EventSignal;
591 
592     auto buf = Appender!(int[])();
593     auto data = [1, 2, 3, 4];
594 
595     auto event = new EventSignal;
596     auto observer = (int n) {
597         buf.put(n);
598         if (n == 4)
599             event.setSignal();
600     };
601     data.asObservable().subscribeOn(new ThreadScheduler).subscribe(observer);
602 
603     event.wait();
604 
605     assert(equal(buf.data, data));
606 }
607 
608 unittest
609 {
610     import std.algorithm : equal;
611     import std.array : Appender;
612     import rx.util : EventSignal;
613 
614     auto buf = Appender!(int[])();
615     auto data = [1, 2, 3, 4];
616 
617     auto event = new EventSignal;
618     auto observer = (int n) {
619         buf.put(n);
620         if (n == 4)
621             event.setSignal();
622     };
623     data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe(observer);
624 
625     event.wait();
626 
627     assert(equal(buf.data, data));
628 }
629 
630 unittest
631 {
632     import rx.util : EventSignal;
633 
634     auto data = [1, 2, 3, 4];
635     auto event = new EventSignal();
636 
637     data.asObservable().subscribeOn(new ThreadScheduler).subscribe((int a) {
638         if (a == 4)
639             event.setSignal();
640     });
641 
642     event.wait();
643 }
644 
645 unittest
646 {
647     import rx.util : EventSignal;
648 
649     auto data = [1, 2, 3, 4];
650     auto event = new EventSignal();
651 
652     data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe((int a) {
653         if (a == 4)
654             event.setSignal();
655     });
656 
657     event.wait();
658 }
659 
660 unittest
661 {
662     import core.atomic;
663     import core.sync.condition;
664     import std.typetuple;
665     import rx.util : EventSignal;
666 
667     enum N = 4;
668 
669     void test(Scheduler scheduler)
670     {
671         auto signal = new EventSignal;
672         shared count = 0;
673         foreach (n; 0 .. N)
674         {
675             scheduler.start(() {
676                 atomicOp!"+="(count, 1);
677                 Thread.sleep(dur!"msecs"(50));
678                 if (atomicLoad(count) == N)
679                     signal.setSignal();
680             });
681         }
682         signal.wait();
683         assert(count == N);
684     }
685 
686     test(new LocalScheduler);
687     test(new ThreadScheduler);
688     test(new TaskPoolScheduler);
689     test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler));
690     test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler));
691 }
692 
693 private __gshared Scheduler s_scheduler;
694 shared static this()
695 {
696     s_scheduler = new TaskPoolScheduler;
697 }
698 
699 Scheduler currentScheduler() @property
700 {
701     return s_scheduler;
702 }
703 
704 TScheduler currentScheduler(TScheduler : Scheduler)(TScheduler scheduler) @property
705 {
706     s_scheduler = scheduler;
707     return scheduler;
708 }
709 
710 unittest
711 {
712     Scheduler s = currentScheduler;
713     scope (exit)
714         currentScheduler = s;
715 
716     TaskPoolScheduler s1 = new TaskPoolScheduler;
717     TaskPoolScheduler s2 = currentScheduler = s1;
718     assert(s2 is s1);
719 }