1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'combineLatest' 3 +/ 4 module rx.algorithm.combineLatest; 5 6 import rx.disposable; 7 import rx.observable; 8 import rx.observer; 9 import rx.util; 10 11 import std.range : put; 12 import std.meta : staticMap, allSatisfy; 13 import std.typecons : Tuple, tuple; 14 15 /// 16 template combineLatest(alias f = tuple) 17 { 18 CombineLatestObservable!(f, TObservables) combineLatest(TObservables...)(TObservables observables) 19 if (allSatisfy!(isObservable, TObservables)) 20 { 21 return typeof(return)(observables); 22 } 23 } 24 25 /// 26 unittest 27 { 28 import rx : SubjectObject, CounterObserver; 29 30 auto hello = new SubjectObject!string; 31 auto world = new SubjectObject!string; 32 33 auto message = combineLatest!((a, b) => a ~ ", " ~ b ~ "!")(hello, world); 34 35 auto observer = new CounterObserver!string; 36 message.doSubscribe(observer); 37 38 .put(hello, "Hello"); 39 .put(world, "world"); 40 41 assert(observer.putCount == 1); 42 assert(observer.lastValue == "Hello, world!"); 43 44 .put(world, "D-man"); 45 assert(observer.putCount == 2); 46 assert(observer.lastValue == "Hello, D-man!"); 47 } 48 49 /// 50 unittest 51 { 52 import rx : SubjectObject, CounterObserver, uniq; 53 54 auto count1 = new SubjectObject!int; 55 auto count2 = new SubjectObject!int; 56 auto count3 = new SubjectObject!int; 57 58 import std.algorithm : max; 59 60 alias pickMax = combineLatest!max; 61 auto observable = pickMax(count1, count2, count3).uniq(); 62 63 auto observer = new CounterObserver!int; 64 observable.doSubscribe(observer); 65 66 .put(count1, 0); 67 .put(count2, 0); 68 .put(count3, 0); 69 70 assert(observer.putCount == 1); 71 assert(observer.lastValue == 0); 72 73 .put(count1, 10); 74 assert(observer.putCount == 2); 75 assert(observer.lastValue == 10); 76 77 .put(count2, 10); 78 assert(observer.putCount == 2); 79 80 .put(count3, 11); 81 assert(observer.putCount == 3); 82 assert(observer.lastValue == 11); 83 } 84 85 unittest 86 { 87 import rx : SubjectObject; 88 89 auto s1 = new SubjectObject!int; 90 auto s2 = new SubjectObject!int; 91 92 auto comb = combineLatest(s1, s2); 93 94 Tuple!(int, int)[] result; 95 comb.doSubscribe!(t => result ~= t); 96 97 .put(s1, 0); 98 .put(s2, 100); 99 assert(result.length == 1); 100 assert(result[0] == tuple(0, 100)); 101 102 .put(s1, 1); 103 assert(result.length == 2); 104 assert(result[1] == tuple(1, 100)); 105 106 .put(s2, 101); 107 assert(result.length == 3); 108 assert(result[2] == tuple(1, 101)); 109 } 110 111 unittest 112 { 113 import rx : SubjectObject, CounterObserver, map; 114 115 auto s1 = new SubjectObject!int; 116 auto s2 = new SubjectObject!int; 117 118 auto sum = combineLatest(s1, s2).map!"a[0] + a[1]"(); 119 120 auto observer = new CounterObserver!int; 121 sum.doSubscribe(observer); 122 123 .put(s1, 1); 124 .put(s2, 2); 125 assert(observer.putCount == 1); 126 assert(observer.lastValue == 3); 127 } 128 129 unittest 130 { 131 import rx : SubjectObject, CounterObserver; 132 133 auto s1 = new SubjectObject!int; 134 auto s2 = new SubjectObject!int; 135 136 auto observable = combineLatest(s1, s2); 137 auto observer = new CounterObserver!(Tuple!(int, int)); 138 auto disposable = observable.doSubscribe(observer); 139 140 disposable.dispose(); 141 142 .put(s1, 1); 143 .put(s2, 2); 144 assert(observer.putCount == 0); 145 } 146 147 unittest 148 { 149 import rx : SubjectObject, CounterObserver; 150 151 auto s1 = new SubjectObject!int; 152 auto s2 = new SubjectObject!int; 153 154 auto observable = combineLatest(s1, s2); 155 auto observer = new CounterObserver!(Tuple!(int, int)); 156 auto disposable = observable.doSubscribe(observer); 157 158 .put(s1, 1); 159 .put(s2, 2); 160 assert(observer.putCount == 1); 161 162 disposable.dispose(); 163 .put(s1, 10); 164 .put(s2, 100); 165 assert(observer.putCount == 1); 166 } 167 168 unittest 169 { 170 import rx : SubjectObject, CounterObserver; 171 172 auto s1 = new SubjectObject!int; 173 auto s2 = new SubjectObject!int; 174 175 auto observable = combineLatest(s1, s2); 176 auto observer = new CounterObserver!(Tuple!(int, int)); 177 auto disposable = observable.doSubscribe(observer); 178 179 s1.completed(); 180 assert(observer.completedCount == 0); 181 s2.completed(); 182 assert(observer.completedCount == 1); 183 } 184 185 unittest 186 { 187 import rx : SubjectObject, CounterObserver; 188 189 auto s1 = new SubjectObject!int; 190 auto s2 = new SubjectObject!int; 191 192 auto observable = combineLatest(s1, s2); 193 auto observer = new CounterObserver!(Tuple!(int, int)); 194 auto disposable = observable.doSubscribe(observer); 195 196 auto ex = new Exception("message"); 197 s1.failure(ex); 198 assert(observer.completedCount == 0); 199 assert(observer.failureCount == 1); 200 assert(observer.lastException is ex); 201 202 .put(s2, 10); 203 assert(observer.putCount == 0); 204 } 205 206 unittest 207 { 208 import rx : SubjectObject, CounterObserver; 209 210 auto s1 = new SubjectObject!int; 211 auto s2 = new SubjectObject!int; 212 213 auto observable = combineLatest(s1, s2); 214 auto observer = new CounterObserver!(Tuple!(int, int)); 215 auto disposable = observable.doSubscribe(observer); 216 217 s1.completed(); 218 assert(observer.completedCount == 0); 219 assert(observer.failureCount == 0); 220 221 auto ex = new Exception("message"); 222 s2.failure(ex); 223 assert(observer.completedCount == 0); 224 assert(observer.failureCount == 1); 225 assert(observer.lastException is ex); 226 } 227 228 unittest 229 { 230 import rx : SubjectObject, CounterObserver; 231 232 auto s1 = new SubjectObject!int; 233 auto s2 = new SubjectObject!int; 234 auto s3 = new SubjectObject!int; 235 auto s4 = new SubjectObject!int; 236 auto s5 = new SubjectObject!int; 237 238 auto observable = combineLatest(s1, s2, s3, s4, s5); 239 auto observer = new CounterObserver!(Tuple!(int, int, int, int, int)); 240 auto disposable = observable.doSubscribe(observer); 241 242 .put(s1, 0); 243 .put(s2, 0); 244 .put(s3, 0); 245 .put(s4, 0); 246 .put(s5, 0); 247 assert(observer.putCount == 1); 248 assert(observer.lastValue == tuple(0, 0, 0, 0, 0)); 249 } 250 251 252 private template GetElementType(T) { 253 alias GetElementType = T.ElementType; 254 } 255 256 struct CombineLatestObservable(alias f, TObservables...) 257 { 258 alias ElementType = typeof(({ 259 alias ElementTypes = staticMap!(GetElementType, TObservables); 260 return f(ElementTypes.init); 261 })()); 262 263 TObservables _observables; 264 265 auto subscribe(TObserver)(TObserver observer) 266 { 267 alias CombCoordinator = CombineLatestCoordinator!(f, TObserver, staticMap!(GetElementType, TObservables)); 268 269 auto subscription = new SingleAssignmentDisposable; 270 auto coordinator = new CombCoordinator(observer, subscription); 271 272 Disposable[TObservables.length] innerSubscriptions; 273 foreach(i, T; TObservables) 274 { 275 alias CombObserver = CombineLatestObserver!(CombCoordinator, TObservables[i].ElementType, i); 276 innerSubscriptions[i] = _observables[i].doSubscribe(CombObserver(coordinator)).disposableObject(); 277 } 278 subscription.setDisposable(new CompositeDisposable(innerSubscriptions)); 279 280 return subscription; 281 } 282 } 283 284 class CombineLatestCoordinator(alias f, TObserver, ElementTypes...) 285 { 286 public: 287 this(TObserver observer, Disposable subscription) 288 { 289 _gate = new Object; 290 _counter = new shared(AtomicCounter)(ElementTypes.length); 291 _observer = observer; 292 _subscription = subscription; 293 } 294 295 public: 296 void innerPut(size_t index)(ElementTypes[index] obj) 297 { 298 if (_counter.isZero) return; 299 300 synchronized (_gate) 301 { 302 _values[index] = obj; 303 _hasValues[index] = true; 304 305 foreach (hasValue; _hasValues) 306 { 307 if (!hasValue) return; 308 } 309 310 .put(_observer, f(_values)); 311 } 312 } 313 314 void innerCompleted() 315 { 316 auto res = _counter.tryDecrement(); 317 if (res.success && res.count == 0) 318 { 319 static if (hasCompleted!TObserver) 320 { 321 _observer.completed(); 322 } 323 _subscription.dispose(); 324 } 325 } 326 327 void innerFailure(Exception e) 328 { 329 if (_counter.trySetZero()) 330 { 331 static if (hasFailure!TObserver) 332 { 333 _observer.failure(e); 334 } 335 _subscription.dispose(); 336 } 337 } 338 339 public: 340 Object _gate; 341 shared(AtomicCounter) _counter; 342 bool[ElementTypes.length] _hasValues; 343 ElementTypes _values; 344 345 TObserver _observer; 346 Disposable _subscription; 347 } 348 349 struct CombineLatestObserver(TCoordinator, E, size_t index) 350 { 351 TCoordinator _parent; 352 353 void put(E obj) 354 { 355 _parent.innerPut!index(obj); 356 } 357 358 void completed() 359 { 360 _parent.innerCompleted(); 361 } 362 363 void failure(Exception e) 364 { 365 _parent.innerFailure(e); 366 } 367 } 368 369 unittest 370 { 371 import rx : CounterObserver; 372 373 alias CombCoordinator = CombineLatestCoordinator!(tuple, Observer!(Tuple!int), int); 374 alias CombObserver = CombineLatestObserver!(CombCoordinator, int, 0); 375 376 auto subscription = new SingleAssignmentDisposable; 377 auto observer = new CounterObserver!(Tuple!int); 378 auto coordinator = new CombCoordinator(observer, subscription); 379 auto co = CombObserver(coordinator); 380 } 381 382 unittest 383 { 384 alias CombObservable = CombineLatestObservable!(tuple, Observable!string, Observable!int); 385 static assert(is(CombObservable.ElementType == Tuple!(string, int))); 386 387 import rx : SubjectObject, CounterObserver; 388 389 auto name = new SubjectObject!string; 390 auto age = new SubjectObject!int; 391 392 auto observable = CombObservable(name, age); 393 auto observer = new CounterObserver!(Tuple!(string, int)); 394 auto subscription = observable.subscribe(observer); 395 396 subscription.dispose(); 397 }