1 module rx.scheduler;
2 
3 import rx.disposable;
4 import rx.observer;
5 import rx.observable;
6 
7 import core.time;
8 import core.thread : Thread;
9 import std.range : put;
10 import std.parallelism : TaskPool, taskPool, task;
11 
12 //Note:
13 // In single core, taskPool's worker are not initialized.
14 // Some asynchronous algorithm does not work as expected, so at least 1 thread is reserved.
15 version (Disable_ReservePoolThreadsOnSingleCore)
16 {
17 }
18 else
19 {
20     shared static this()
21     {
22         import std.parallelism : defaultPoolThreads;
23 
24         if (defaultPoolThreads == 0)
25             defaultPoolThreads = 1;
26     }
27 }
28 
29 ///
30 interface Scheduler
31 {
32     void start(void delegate() op);
33 }
34 ///
35 interface AsyncScheduler : Scheduler
36 {
37     CancellationToken schedule(void delegate() op, Duration val);
38 }
39 
40 ///
41 class LocalScheduler : Scheduler
42 {
43 public:
44     void start(void delegate() op)
45     {
46         op();
47     }
48 }
49 ///
50 class ThreadScheduler : AsyncScheduler
51 {
52     void start(void delegate() op)
53     {
54         auto t = new Thread(op);
55         t.start();
56     }
57 
58     CancellationToken schedule(void delegate() op, Duration val)
59     {
60         auto target = MonoTime.currTime + val;
61         auto c = new CancellationToken;
62         start({
63             if (c.isCanceled)
64                 return;
65             auto dt = target - MonoTime.currTime;
66             if (dt > Duration.zero)
67                 Thread.sleep(dt);
68             if (!c.isCanceled)
69                 op();
70         });
71         return c;
72     }
73 }
74 
75 unittest
76 {
77     import std.stdio : writeln;
78 
79     writeln("Testing ThreadScheduler...");
80     scope (exit)
81         writeln("ThreadScheduler test is completed.");
82 
83     import rx.util : EventSignal;
84 
85     auto scheduler = new ThreadScheduler;
86     auto signal = new EventSignal;
87     auto done = false;
88     auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs);
89 
90     signal.wait();
91     assert(done);
92     assert(!c.isCanceled);
93 }
94 ///
95 class TaskPoolScheduler : AsyncScheduler
96 {
97 public:
98     this(TaskPool pool = null)
99     {
100         if (pool is null)
101             pool = taskPool;
102 
103         _pool = pool;
104     }
105 
106 public:
107     void start(void delegate() op)
108     {
109         _pool.put(task(op));
110     }
111 
112     CancellationToken schedule(void delegate() op, Duration val)
113     {
114         auto target = MonoTime.currTime + val;
115         auto c = new CancellationToken;
116         start({
117             if (c.isCanceled)
118                 return;
119             auto dt = target - MonoTime.currTime;
120             if (dt > Duration.zero)
121                 Thread.sleep(dt);
122             if (!c.isCanceled)
123                 op();
124         });
125         return c;
126     }
127 
128 private:
129     TaskPool _pool;
130 }
131 
132 unittest
133 {
134     import std.stdio : writeln;
135     import std.parallelism : totalCPUs, defaultPoolThreads;
136 
137     writeln("Testing TaskPoolScheduler...");
138     scope (exit)
139         writeln("TaskPoolScheduler test is completed.");
140 
141     version (OSX)
142     {
143         writeln("totalCPUs: ", totalCPUs);
144         writeln("defaultPoolThreads: ", defaultPoolThreads);
145     }
146 
147     import rx.util : EventSignal;
148 
149     auto scheduler = new TaskPoolScheduler;
150     auto signal = new EventSignal;
151     auto done = false;
152     auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs);
153 
154     signal.wait();
155     assert(done);
156     assert(!c.isCanceled);
157 }
158 
159 ///
160 class HistoricalScheduler(T) : AsyncScheduler
161 {
162     static assert(is(T : AsyncScheduler));
163 
164 public:
165     this(T innerScheduler)
166     {
167         _offset = Duration.zero;
168         _innerScheduler = innerScheduler;
169     }
170 
171 public:
172     void start(void delegate() op)
173     {
174         _innerScheduler.start(op);
175     }
176 
177     CancellationToken schedule(void delegate() op, Duration val)
178     {
179         return _innerScheduler.schedule(op, val - _offset);
180     }
181 
182     void roll(Duration val)
183     {
184         _offset += val;
185     }
186 
187 private:
188     T _innerScheduler;
189     Duration _offset;
190 }
191 ///
192 HistoricalScheduler!TScheduler historicalScheduler(TScheduler)(auto ref TScheduler scheduler)
193 {
194     return new typeof(return)(scheduler);
195 }
196 
197 unittest
198 {
199     import std.stdio : writeln;
200 
201     writeln("Testing HistoricalScheduler...");
202     scope (exit)
203         writeln("HistoricalScheduler test is completed.");
204 
205     void test(AsyncScheduler scheduler)
206     {
207         import rx.util : EventSignal;
208 
209         bool done = false;
210         auto signal = new EventSignal;
211 
212         auto c = scheduler.schedule(() { done = true; signal.setSignal(); }, dur!"msecs"(100));
213         assert(!done);
214 
215         signal.wait();
216         assert(done);
217         assert(!c.isCanceled);
218     }
219 
220     //test(new ThreadScheduler);
221     //test(new TaskPoolScheduler);
222     test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler));
223     test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler));
224 }
225 
226 unittest
227 {
228     void test(AsyncScheduler scheduler)
229     {
230         bool done = false;
231         auto c = scheduler.schedule(() { done = true; }, dur!"msecs"(50));
232         c.cancel();
233         Thread.sleep(dur!"msecs"(100));
234         assert(!done);
235     }
236 
237     test(new ThreadScheduler);
238     test(new TaskPoolScheduler);
239     test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler));
240     test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler));
241 }
242 
243 unittest
244 {
245     import std.typetuple : TypeTuple;
246 
247     foreach (T; TypeTuple!(ThreadScheduler, TaskPoolScheduler))
248     {
249         auto scheduler = historicalScheduler(new T);
250 
251         scheduler.roll(dur!"seconds"(20));
252 
253         auto done = false;
254         auto c = scheduler.schedule(() { done = true; }, dur!"seconds"(10));
255         Thread.sleep(dur!"msecs"(10)); // wait for a context switch
256         assert(done);
257     }
258 }
259 
260 unittest
261 {
262     static assert(!__traits(compiles, { HistoricalScheduler!LocalScheduler s; }));
263 }
264 
265 struct ObserveOnObserver(TObserver, TScheduler, E)
266 {
267 public:
268     static if (hasFailure!TObserver)
269     {
270         this(TObserver observer, TScheduler scheduler, Disposable disposable)
271         {
272             _observer = observer;
273             _scheduler = scheduler;
274             _disposable = disposable;
275         }
276     }
277     else
278     {
279         this(TObserver observer, TScheduler scheduler)
280         {
281             _observer = observer;
282             _scheduler = scheduler;
283         }
284     }
285 public:
286     void put(E obj)
287     {
288         _scheduler.start({
289             static if (hasFailure!TObserver)
290             {
291                 try
292                 {
293                     _observer.put(obj);
294                 }
295                 catch (Exception e)
296                 {
297                     _observer.failure(e);
298                     _disposable.dispose();
299                 }
300             }
301             else
302             {
303                 _observer.put(obj);
304             }
305         });
306     }
307 
308     static if (hasCompleted!TObserver)
309     {
310         void completed()
311         {
312             _scheduler.start({ _observer.completed(); });
313         }
314     }
315     static if (hasFailure!TObserver)
316     {
317         void failure(Exception e)
318         {
319             _scheduler.start({ _observer.failure(e); });
320         }
321     }
322 private:
323     TObserver _observer;
324     TScheduler _scheduler;
325     static if (hasFailure!TObserver)
326     {
327         Disposable _disposable;
328     }
329 }
330 
331 struct ObserveOnObservable(TObservable, TScheduler)
332 {
333     alias ElementType = TObservable.ElementType;
334 public:
335     this(TObservable observable, TScheduler scheduler)
336     {
337         _observable = observable;
338         _scheduler = scheduler;
339     }
340 
341 public:
342     auto subscribe(TObserver)(TObserver observer)
343     {
344         alias ObserverType = ObserveOnObserver!(TObserver, TScheduler, TObservable.ElementType);
345         static if (hasFailure!TObserver)
346         {
347             auto disposable = new SingleAssignmentDisposable;
348             disposable.setDisposable(disposableObject(doSubscribe(_observable,
349                     ObserverType(observer, _scheduler, disposable))));
350             return disposable;
351         }
352         else
353         {
354             return doSubscribe(_observable, ObserverType(observer, _scheduler));
355         }
356     }
357 
358 private:
359     TObservable _observable;
360     TScheduler _scheduler;
361 }
362 
363 ObserveOnObservable!(TObservable, TScheduler) observeOn(TObservable, TScheduler : Scheduler)(
364         auto ref TObservable observable, TScheduler scheduler)
365 {
366     return typeof(return)(observable, scheduler);
367 }
368 
369 unittest
370 {
371     import std.concurrency;
372     import rx.subject;
373 
374     auto subject = new SubjectObject!int;
375     auto scheduler = new LocalScheduler;
376     auto scheduled = subject.observeOn(scheduler);
377 
378     import std.array : appender;
379 
380     auto buf = appender!(int[]);
381     auto observer = observerObject!int(buf);
382 
383     auto d1 = scheduled.subscribe(buf);
384     auto d2 = scheduled.subscribe(observer);
385 
386     subject.put(0);
387     assert(buf.data.length == 2);
388 
389     subject.put(1);
390     assert(buf.data.length == 4);
391 }
392 
393 unittest
394 {
395     import std.concurrency;
396     import rx.subject;
397 
398     auto subject = new SubjectObject!int;
399     auto scheduler = new LocalScheduler;
400     auto scheduled = subject.observeOn(scheduler);
401 
402     struct ObserverA
403     {
404         void put(int n)
405         {
406         }
407     }
408 
409     struct ObserverB
410     {
411         void put(int n)
412         {
413         }
414 
415         void completed()
416         {
417         }
418     }
419 
420     struct ObserverC
421     {
422         void put(int n)
423         {
424         }
425 
426         void failure(Exception e)
427         {
428         }
429     }
430 
431     struct ObserverD
432     {
433         void put(int n)
434         {
435         }
436 
437         void completed()
438         {
439         }
440 
441         void failure(Exception e)
442         {
443         }
444     }
445 
446     scheduled.doSubscribe(ObserverA());
447     scheduled.doSubscribe(ObserverB());
448     scheduled.doSubscribe(ObserverC());
449     scheduled.doSubscribe(ObserverD());
450 
451     subject.put(1);
452     subject.completed();
453 }
454 
455 ///
456 struct SubscribeOnObservable(TObservable, TScheduler : Scheduler)
457 {
458     alias ElementType = TObservable.ElementType;
459 
460 public:
461     this(ref TObservable observable, ref TScheduler scheduler)
462     {
463         _observable = observable;
464         _scheduler = scheduler;
465     }
466 
467 public:
468     auto subscribe(TObserver)(auto ref TObserver observer)
469     {
470         auto disposable = new SingleAssignmentDisposable;
471         _scheduler.start({
472             auto temp = doSubscribe(_observable, observer);
473             disposable.setDisposable(disposableObject(temp));
474         });
475         return disposable;
476     }
477 
478 private:
479     TObservable _observable;
480     TScheduler _scheduler;
481 }
482 
483 unittest
484 {
485     alias TestObservable = ObserveOnObservable!(Observable!int, Scheduler);
486     static assert(isObservable!(TestObservable, int));
487 
488     import rx.subject : SubjectObject;
489 
490     auto sub = new SubjectObject!int;
491     auto scheduler = new LocalScheduler;
492 
493     auto scheduled = TestObservable(sub, scheduler);
494 
495     auto d = scheduled.subscribe((int n) {  });
496     scope (exit)
497         d.dispose();
498 
499     auto d2 = doSubscribe(sub, (int n) {  });
500     scope (exit)
501         d2.dispose();
502 }
503 
504 ///
505 ObserveOnObservable!(TObservable, TScheduler) subscribeOn(TObservable, TScheduler : Scheduler)(
506         auto ref TObservable observable, auto ref TScheduler scheduler)
507 {
508     return typeof(return)(observable, scheduler);
509 }
510 ///
511 unittest
512 {
513     import rx.observable : defer;
514 
515     auto sub = defer!int((Observer!int observer) {
516         .put(observer, 100);
517         return NopDisposable.instance;
518     });
519     auto scheduler = new LocalScheduler;
520 
521     auto scheduled = sub.subscribeOn(scheduler);
522 
523     int value = 0;
524     auto d = scheduled.doSubscribe((int n) { value = n; });
525     scope (exit)
526         d.dispose();
527 
528     assert(value == 100);
529 }
530 ///
531 unittest
532 {
533     import rx.observable : defer;
534     import rx.util : EventSignal;
535 
536     auto sub = defer!int((Observer!int observer) {
537         .put(observer, 100);
538         return NopDisposable.instance;
539     });
540     auto scheduler = new TaskPoolScheduler;
541     auto scheduled = sub.subscribeOn(scheduler);
542 
543     int value = 0;
544     auto signal = new EventSignal;
545     auto d = scheduled.doSubscribe((int n) { value = n; signal.setSignal(); });
546     scope (exit)
547         d.dispose();
548 
549     signal.wait();
550     assert(value == 100);
551 }
552 
553 unittest
554 {
555     import core.atomic;
556     import core.sync.condition;
557     import std.typetuple;
558     import rx.util : EventSignal;
559 
560     enum N = 4;
561 
562     void test(Scheduler scheduler)
563     {
564         auto signal = new EventSignal;
565         shared count = 0;
566         foreach (n; 0 .. N)
567         {
568             scheduler.start(() {
569                 atomicOp!"+="(count, 1);
570                 Thread.sleep(dur!"msecs"(50));
571                 if (atomicLoad(count) == N)
572                     signal.setSignal();
573             });
574         }
575         signal.wait();
576         assert(count == N);
577     }
578 
579     test(new LocalScheduler);
580     test(new ThreadScheduler);
581     test(new TaskPoolScheduler);
582     test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler));
583     test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler));
584 }
585 
586 private __gshared Scheduler s_scheduler;
587 shared static this()
588 {
589     s_scheduler = new TaskPoolScheduler;
590 }
591 
592 Scheduler currentScheduler() @property
593 {
594     return s_scheduler;
595 }
596 
597 TScheduler currentScheduler(TScheduler : Scheduler)(TScheduler scheduler) @property
598 {
599     s_scheduler = scheduler;
600     return scheduler;
601 }
602 
603 unittest
604 {
605     Scheduler s = currentScheduler;
606     scope (exit)
607         currentScheduler = s;
608 
609     TaskPoolScheduler s1 = new TaskPoolScheduler;
610     TaskPoolScheduler s2 = currentScheduler = s1;
611     assert(s2 is s1);
612 }