1 /+++++++++++++++++++++++++++++ 2 + This module is a submodule of rx.range. 3 + It provides basic operation a 'takeUntil' 4 +/ 5 module rx.range.takeUntil; 6 7 import rx.disposable; 8 import rx.observer; 9 import rx.observable; 10 import rx.util; 11 12 import std.range : put; 13 14 //#################### 15 // TakeUntil 16 //#################### 17 /// 18 auto takeUntil(TObservable1, TObservable2)(auto ref TObservable1 source, auto ref TObservable2 stopper) 19 if (isObservable!TObservable1 && isObservable!TObservable2) 20 { 21 static struct TakeUntilObservable 22 { 23 alias ElementType = TObservable1.ElementType; 24 25 TObservable1 source; 26 TObservable2 stopper; 27 28 auto subscribe(TObserver)(auto ref TObserver observer) 29 { 30 auto sourceDisposable = source.doSubscribe(observer); 31 auto stopperDisposable = stopper.doSubscribe((TObservable2.ElementType _) { 32 static if (hasCompleted!TObserver) 33 { 34 observer.completed(); 35 } 36 sourceDisposable.dispose(); 37 }); 38 39 return new CompositeDisposable(sourceDisposable, stopperDisposable); 40 } 41 } 42 43 return TakeUntilObservable(source, stopper); 44 } 45 46 /// 47 unittest 48 { 49 import std.algorithm; 50 import rx; 51 52 auto source = new SubjectObject!int; 53 auto stopper = new SubjectObject!int; 54 55 int[] buf; 56 auto disposable = source.takeUntil(stopper).doSubscribe!((n) { buf ~= n; }); 57 58 source.put(0); 59 source.put(1); 60 source.put(2); 61 62 stopper.put(0); 63 64 source.put(3); 65 source.put(4); 66 67 assert(equal(buf, [0, 1, 2])); 68 } 69 70 unittest 71 { 72 import std.algorithm; 73 import rx; 74 75 auto sub1 = new SubjectObject!int; 76 auto sub2 = new SubjectObject!int; 77 78 int[] buf; 79 auto disposable = sub1.takeUntil(sub2).subscribe((int n) { buf ~= n; }); 80 81 sub1.put(0); 82 83 disposable.dispose(); 84 85 sub1.put(1); 86 87 assert(equal(buf, [0])); 88 }