1 module rx.scheduler; 2 3 import rx.disposable; 4 import rx.observer; 5 import rx.observable; 6 7 import core.time; 8 import core.thread : Thread; 9 import std.range : put; 10 import std.parallelism : TaskPool, taskPool, task; 11 12 //Note: 13 // In single core, taskPool's worker are not initialized. 14 // Some asynchronous algorithm does not work as expected, so at least 1 thread is reserved. 15 version (Disable_ReservePoolThreadsOnSingleCore) 16 { 17 } 18 else 19 { 20 shared static this() 21 { 22 import std.parallelism : defaultPoolThreads; 23 24 if (defaultPoolThreads == 0) 25 defaultPoolThreads = 1; 26 } 27 } 28 29 /// 30 interface Scheduler 31 { 32 void start(void delegate() op); 33 } 34 /// 35 interface AsyncScheduler : Scheduler 36 { 37 CancellationToken schedule(void delegate() op, Duration val); 38 } 39 40 /// 41 class LocalScheduler : Scheduler 42 { 43 public: 44 void start(void delegate() op) 45 { 46 op(); 47 } 48 } 49 /// 50 class ThreadScheduler : AsyncScheduler 51 { 52 void start(void delegate() op) 53 { 54 auto t = new Thread(op); 55 t.start(); 56 } 57 58 CancellationToken schedule(void delegate() op, Duration val) 59 { 60 auto target = MonoTime.currTime + val; 61 auto c = new CancellationToken; 62 start({ 63 if (c.isCanceled) 64 return; 65 auto dt = target - MonoTime.currTime; 66 if (dt > Duration.zero) 67 Thread.sleep(dt); 68 if (!c.isCanceled) 69 op(); 70 }); 71 return c; 72 } 73 } 74 75 unittest 76 { 77 import std.stdio : writeln; 78 79 writeln("Testing ThreadScheduler..."); 80 scope (exit) 81 writeln("ThreadScheduler test is completed."); 82 83 import rx.util : EventSignal; 84 85 auto scheduler = new ThreadScheduler; 86 auto signal = new EventSignal; 87 auto done = false; 88 auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs); 89 90 signal.wait(); 91 assert(done); 92 assert(!c.isCanceled); 93 } 94 /// 95 class TaskPoolScheduler : AsyncScheduler 96 { 97 public: 98 this(TaskPool pool = null) 99 { 100 if (pool is null) 101 pool = taskPool; 102 103 _pool = pool; 104 } 105 106 public: 107 void start(void delegate() op) 108 { 109 _pool.put(task(op)); 110 } 111 112 CancellationToken schedule(void delegate() op, Duration val) 113 { 114 auto target = MonoTime.currTime + val; 115 auto c = new CancellationToken; 116 start({ 117 if (c.isCanceled) 118 return; 119 auto dt = target - MonoTime.currTime; 120 if (dt > Duration.zero) 121 Thread.sleep(dt); 122 if (!c.isCanceled) 123 op(); 124 }); 125 return c; 126 } 127 128 private: 129 TaskPool _pool; 130 } 131 132 unittest 133 { 134 import std.stdio : writeln; 135 import std.parallelism : totalCPUs, defaultPoolThreads; 136 137 writeln("Testing TaskPoolScheduler..."); 138 scope (exit) 139 writeln("TaskPoolScheduler test is completed."); 140 141 version (OSX) 142 { 143 writeln("totalCPUs: ", totalCPUs); 144 writeln("defaultPoolThreads: ", defaultPoolThreads); 145 } 146 147 import rx.util : EventSignal; 148 149 auto scheduler = new TaskPoolScheduler; 150 auto signal = new EventSignal; 151 auto done = false; 152 auto c = scheduler.schedule({ done = true; signal.setSignal(); }, 10.msecs); 153 154 signal.wait(); 155 assert(done); 156 assert(!c.isCanceled); 157 } 158 159 /// 160 class HistoricalScheduler(T) : AsyncScheduler 161 { 162 static assert(is(T : AsyncScheduler)); 163 164 public: 165 this(T innerScheduler) 166 { 167 _offset = Duration.zero; 168 _innerScheduler = innerScheduler; 169 } 170 171 public: 172 void start(void delegate() op) 173 { 174 _innerScheduler.start(op); 175 } 176 177 CancellationToken schedule(void delegate() op, Duration val) 178 { 179 return _innerScheduler.schedule(op, val - _offset); 180 } 181 182 void roll(Duration val) 183 { 184 _offset += val; 185 } 186 187 private: 188 T _innerScheduler; 189 Duration _offset; 190 } 191 /// 192 HistoricalScheduler!TScheduler historicalScheduler(TScheduler)(auto ref TScheduler scheduler) 193 { 194 return new typeof(return)(scheduler); 195 } 196 197 unittest 198 { 199 import std.stdio : writeln; 200 201 writeln("Testing HistoricalScheduler..."); 202 scope (exit) 203 writeln("HistoricalScheduler test is completed."); 204 205 void test(AsyncScheduler scheduler) 206 { 207 import rx.util : EventSignal; 208 209 bool done = false; 210 auto signal = new EventSignal; 211 212 auto c = scheduler.schedule(() { done = true; signal.setSignal(); }, dur!"msecs"(100)); 213 assert(!done); 214 215 signal.wait(); 216 assert(done); 217 assert(!c.isCanceled); 218 } 219 220 //test(new ThreadScheduler); 221 //test(new TaskPoolScheduler); 222 test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler)); 223 test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler)); 224 } 225 226 unittest 227 { 228 void test(AsyncScheduler scheduler) 229 { 230 bool done = false; 231 auto c = scheduler.schedule(() { done = true; }, dur!"msecs"(50)); 232 c.cancel(); 233 Thread.sleep(dur!"msecs"(100)); 234 assert(!done); 235 } 236 237 test(new ThreadScheduler); 238 test(new TaskPoolScheduler); 239 test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler)); 240 test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler)); 241 } 242 243 unittest 244 { 245 import std.typetuple : TypeTuple; 246 247 foreach (T; TypeTuple!(ThreadScheduler, TaskPoolScheduler)) 248 { 249 auto scheduler = historicalScheduler(new T); 250 251 scheduler.roll(dur!"seconds"(20)); 252 253 auto done = false; 254 auto c = scheduler.schedule(() { done = true; }, dur!"seconds"(10)); 255 Thread.sleep(dur!"msecs"(10)); // wait for a context switch 256 assert(done); 257 } 258 } 259 260 unittest 261 { 262 static assert(!__traits(compiles, { HistoricalScheduler!LocalScheduler s; })); 263 } 264 265 struct ObserveOnObserver(TObserver, TScheduler, E) 266 { 267 public: 268 static if (hasFailure!TObserver) 269 { 270 this(TObserver observer, TScheduler scheduler, Disposable disposable) 271 { 272 _observer = observer; 273 _scheduler = scheduler; 274 _disposable = disposable; 275 } 276 } 277 else 278 { 279 this(TObserver observer, TScheduler scheduler) 280 { 281 _observer = observer; 282 _scheduler = scheduler; 283 } 284 } 285 public: 286 void put(E obj) 287 { 288 _scheduler.start({ 289 static if (hasFailure!TObserver) 290 { 291 try 292 { 293 _observer.put(obj); 294 } 295 catch (Exception e) 296 { 297 _observer.failure(e); 298 _disposable.dispose(); 299 } 300 } 301 else 302 { 303 _observer.put(obj); 304 } 305 }); 306 } 307 308 static if (hasCompleted!TObserver) 309 { 310 void completed() 311 { 312 _scheduler.start({ _observer.completed(); }); 313 } 314 } 315 static if (hasFailure!TObserver) 316 { 317 void failure(Exception e) 318 { 319 _scheduler.start({ _observer.failure(e); }); 320 } 321 } 322 private: 323 TObserver _observer; 324 TScheduler _scheduler; 325 static if (hasFailure!TObserver) 326 { 327 Disposable _disposable; 328 } 329 } 330 331 struct ObserveOnObservable(TObservable, TScheduler) 332 { 333 alias ElementType = TObservable.ElementType; 334 public: 335 this(TObservable observable, TScheduler scheduler) 336 { 337 _observable = observable; 338 _scheduler = scheduler; 339 } 340 341 public: 342 auto subscribe(TObserver)(TObserver observer) 343 { 344 alias ObserverType = ObserveOnObserver!(TObserver, TScheduler, TObservable.ElementType); 345 static if (hasFailure!TObserver) 346 { 347 auto disposable = new SingleAssignmentDisposable; 348 disposable.setDisposable(disposableObject(doSubscribe(_observable, 349 ObserverType(observer, _scheduler, disposable)))); 350 return disposable; 351 } 352 else 353 { 354 return doSubscribe(_observable, ObserverType(observer, _scheduler)); 355 } 356 } 357 358 private: 359 TObservable _observable; 360 TScheduler _scheduler; 361 } 362 363 ObserveOnObservable!(TObservable, TScheduler) observeOn(TObservable, TScheduler : Scheduler)( 364 auto ref TObservable observable, TScheduler scheduler) 365 { 366 return typeof(return)(observable, scheduler); 367 } 368 369 unittest 370 { 371 import std.concurrency; 372 import rx.subject; 373 374 auto subject = new SubjectObject!int; 375 auto scheduler = new LocalScheduler; 376 auto scheduled = subject.observeOn(scheduler); 377 378 import std.array : appender; 379 380 auto buf = appender!(int[]); 381 auto observer = observerObject!int(buf); 382 383 auto d1 = scheduled.subscribe(buf); 384 auto d2 = scheduled.subscribe(observer); 385 386 subject.put(0); 387 assert(buf.data.length == 2); 388 389 subject.put(1); 390 assert(buf.data.length == 4); 391 } 392 393 unittest 394 { 395 import std.concurrency; 396 import rx.subject; 397 398 auto subject = new SubjectObject!int; 399 auto scheduler = new LocalScheduler; 400 auto scheduled = subject.observeOn(scheduler); 401 402 struct ObserverA 403 { 404 void put(int n) 405 { 406 } 407 } 408 409 struct ObserverB 410 { 411 void put(int n) 412 { 413 } 414 415 void completed() 416 { 417 } 418 } 419 420 struct ObserverC 421 { 422 void put(int n) 423 { 424 } 425 426 void failure(Exception e) 427 { 428 } 429 } 430 431 struct ObserverD 432 { 433 void put(int n) 434 { 435 } 436 437 void completed() 438 { 439 } 440 441 void failure(Exception e) 442 { 443 } 444 } 445 446 scheduled.doSubscribe(ObserverA()); 447 scheduled.doSubscribe(ObserverB()); 448 scheduled.doSubscribe(ObserverC()); 449 scheduled.doSubscribe(ObserverD()); 450 451 subject.put(1); 452 subject.completed(); 453 } 454 455 /// 456 struct SubscribeOnObservable(TObservable, TScheduler : Scheduler) 457 { 458 alias ElementType = TObservable.ElementType; 459 460 public: 461 this(ref TObservable observable, ref TScheduler scheduler) 462 { 463 _observable = observable; 464 _scheduler = scheduler; 465 } 466 467 public: 468 auto subscribe(TObserver)(auto ref TObserver observer) 469 { 470 auto disposable = new SingleAssignmentDisposable; 471 _scheduler.start({ 472 auto temp = doSubscribe(_observable, observer); 473 disposable.setDisposable(disposableObject(temp)); 474 }); 475 return disposable; 476 } 477 478 private: 479 TObservable _observable; 480 TScheduler _scheduler; 481 } 482 483 unittest 484 { 485 alias TestObservable = ObserveOnObservable!(Observable!int, Scheduler); 486 static assert(isObservable!(TestObservable, int)); 487 488 import rx.subject : SubjectObject; 489 490 auto sub = new SubjectObject!int; 491 auto scheduler = new LocalScheduler; 492 493 auto scheduled = TestObservable(sub, scheduler); 494 495 auto d = scheduled.subscribe((int n) { }); 496 scope (exit) 497 d.dispose(); 498 499 auto d2 = doSubscribe(sub, (int n) { }); 500 scope (exit) 501 d2.dispose(); 502 } 503 504 /// 505 ObserveOnObservable!(TObservable, TScheduler) subscribeOn(TObservable, TScheduler : Scheduler)( 506 auto ref TObservable observable, auto ref TScheduler scheduler) 507 { 508 return typeof(return)(observable, scheduler); 509 } 510 /// 511 unittest 512 { 513 import rx.observable : defer; 514 515 auto sub = defer!int((Observer!int observer) { 516 .put(observer, 100); 517 return NopDisposable.instance; 518 }); 519 auto scheduler = new LocalScheduler; 520 521 auto scheduled = sub.subscribeOn(scheduler); 522 523 int value = 0; 524 auto d = scheduled.doSubscribe((int n) { value = n; }); 525 scope (exit) 526 d.dispose(); 527 528 assert(value == 100); 529 } 530 /// 531 unittest 532 { 533 import rx.observable : defer; 534 import rx.util : EventSignal; 535 536 auto sub = defer!int((Observer!int observer) { 537 .put(observer, 100); 538 return NopDisposable.instance; 539 }); 540 auto scheduler = new TaskPoolScheduler; 541 auto scheduled = sub.subscribeOn(scheduler); 542 543 int value = 0; 544 auto signal = new EventSignal; 545 auto d = scheduled.doSubscribe((int n) { value = n; signal.setSignal(); }); 546 scope (exit) 547 d.dispose(); 548 549 signal.wait(); 550 assert(value == 100); 551 } 552 553 unittest 554 { 555 import core.atomic; 556 import core.sync.condition; 557 import std.typetuple; 558 import rx.util : EventSignal; 559 560 enum N = 4; 561 562 void test(Scheduler scheduler) 563 { 564 auto signal = new EventSignal; 565 shared count = 0; 566 foreach (n; 0 .. N) 567 { 568 scheduler.start(() { 569 atomicOp!"+="(count, 1); 570 Thread.sleep(dur!"msecs"(50)); 571 if (atomicLoad(count) == N) 572 signal.setSignal(); 573 }); 574 } 575 signal.wait(); 576 assert(count == N); 577 } 578 579 test(new LocalScheduler); 580 test(new ThreadScheduler); 581 test(new TaskPoolScheduler); 582 test(new HistoricalScheduler!ThreadScheduler(new ThreadScheduler)); 583 test(new HistoricalScheduler!TaskPoolScheduler(new TaskPoolScheduler)); 584 } 585 586 private __gshared Scheduler s_scheduler; 587 shared static this() 588 { 589 s_scheduler = new TaskPoolScheduler; 590 } 591 592 Scheduler currentScheduler() @property 593 { 594 return s_scheduler; 595 } 596 597 TScheduler currentScheduler(TScheduler : Scheduler)(TScheduler scheduler) @property 598 { 599 s_scheduler = scheduler; 600 return scheduler; 601 } 602 603 unittest 604 { 605 Scheduler s = currentScheduler; 606 scope (exit) 607 currentScheduler = s; 608 609 TaskPoolScheduler s1 = new TaskPoolScheduler; 610 TaskPoolScheduler s2 = currentScheduler = s1; 611 assert(s2 is s1); 612 }