1 /// 2 module rx.range.zip; 3 4 import rx.disposable; 5 import rx.observable; 6 import rx.observer; 7 8 import std.range : put; 9 import std.typecons : tuple; 10 import std.meta; 11 import std.container.dlist : DList; 12 13 /// 14 class ZipNObservable(alias selector, TObservables...) 15 { 16 /// 17 alias ElementType = typeof({ 18 GetElementsTuple!TObservables values = void; 19 return selector(values); 20 }()); 21 22 TObservables sources; 23 24 /// 25 this(TObservables observables) 26 { 27 sources = observables; 28 } 29 30 /// 31 auto subscribe(TObserver)(auto ref TObserver observer) 32 { 33 auto sink = new ZipNSink!(selector, ElementType, TObserver, TObservables)(sources, observer); 34 return sink.run(); 35 } 36 } 37 38 class ZipNSink(alias selector, E, TObserver, TObservables...) 39 { 40 alias ItemTypes = GetElementsTuple!TObservables; 41 alias Store = GetZipStoreType!(TObserver, TObservables); 42 43 TObservables sources; 44 TObserver observer; 45 Store store; 46 Object gate; 47 Disposable cancel; 48 49 this(TObservables sources, TObserver observer) 50 { 51 this.sources = sources; 52 this.observer = observer; 53 this.gate = new Object; 54 } 55 56 auto run() 57 { 58 auto disposable = new SingleAssignmentDisposable; 59 this.cancel = disposable; 60 61 Disposable[] disposables; 62 63 static foreach (i; 0 .. TObservables.length) 64 { 65 disposables ~= disposableObject(sources[i].doSubscribe(new ZipChildObserver!(i, 66 TObserver, ItemTypes[i]))); 67 disposables ~= new AnonymousDisposable({ store[i].queue.clear(); }); 68 } 69 disposables ~= new AnonymousDisposable({ 70 static foreach (i; 0 .. TObservables.length) 71 { 72 store[i].queue.clear(); 73 } 74 }); 75 disposable.setDisposable(new CompositeDisposable(disposables)); 76 77 return disposable; 78 } 79 80 bool hasElements() 81 { 82 static foreach (i; 0 .. TObservables.length) 83 { 84 if (store[i].empty) 85 return false; 86 } 87 return true; 88 } 89 90 static if (hasCompleted!TObserver) 91 { 92 bool isCompleted() 93 { 94 static foreach (i; 0 .. TObservables.length) 95 { 96 if (!store[i].isCompleted) 97 return false; 98 } 99 return true; 100 } 101 } 102 103 void enqueue() 104 { 105 106 if (hasElements()) 107 { 108 ItemTypes items; 109 static foreach (i; 0 .. TObservables.length) 110 { 111 items[i] = store[i].dequeue(); 112 } 113 114 static if (hasFailure!TObserver) 115 { 116 E result = void; 117 try 118 { 119 result = selector(items); 120 } 121 catch (Exception e) 122 { 123 observer.failure(e); 124 return; 125 } 126 .put(observer, result); 127 } 128 else 129 { 130 .put(observer, selector(items)); 131 } 132 return; 133 } 134 135 static if (hasCompleted!TObserver) 136 { 137 if (isCompleted()) 138 { 139 observer.completed(); 140 } 141 } 142 } 143 144 static if (hasCompleted!TObserver) 145 { 146 void checkCompleted() 147 { 148 if (isCompleted()) 149 { 150 observer.completed(); 151 } 152 } 153 } 154 155 void failure(Exception e) 156 { 157 observer.failure(e); 158 } 159 160 class ZipChildObserver(size_t index, TObserver, E) 161 { 162 void put(E obj) 163 { 164 synchronized (gate) 165 { 166 store[index].enqueue(obj); 167 this.outer.enqueue(); 168 } 169 } 170 171 static if (hasCompleted!TObserver) 172 { 173 void completed() 174 { 175 synchronized (gate) 176 { 177 store[index].isCompleted = true; 178 this.outer.checkCompleted(); 179 } 180 } 181 } 182 183 void failure(Exception e) 184 { 185 scope(exit) cancel.dispose(); 186 static if (hasFailure!TObserver) 187 { 188 store[index].isCompleted = true; 189 this.outer.observer.failure(e); 190 } 191 } 192 } 193 } 194 195 /// 196 template GetElementsTuple(Ts...) 197 { 198 alias GetElementType(TObservable) = TObservable.ElementType; 199 alias GetElementsTuple = AliasSeq!(staticMap!(GetElementType, Ts)); 200 } 201 202 /// 203 alias GetZipStoreType(TObserver, TObservables...) = AliasSeq!( 204 staticMap!(GetZipStoreTypeImpl!TObserver, GetElementsTuple!TObservables)); 205 206 /// 207 template GetZipStoreTypeImpl(TObserver) 208 { 209 /// 210 alias GetZipStoreTypeImpl(E) = ZipStore!(E, hasCompleted!TObserver); 211 } 212 213 unittest 214 { 215 import rx; 216 217 alias GetStore = GetZipStoreTypeImpl!(Observer!int); 218 alias Store = GetStore!int; 219 220 static assert(is(Store == ZipStore!(int, true))); 221 } 222 223 /// 224 struct ZipStore(E, bool useCompleted) 225 { 226 DList!E queue; 227 static if (useCompleted) 228 { 229 bool isCompleted; 230 } 231 232 bool empty() @property 233 { 234 return queue.empty; 235 } 236 237 void enqueue(E obj) 238 { 239 queue.insertBack(obj); 240 } 241 242 E dequeue() 243 { 244 scope (success) 245 { 246 queue.removeFront(); 247 } 248 249 return queue.front; 250 } 251 } 252 253 unittest 254 { 255 import rx; 256 import std.conv : to; 257 import std.range : iota; 258 259 alias A = SubjectObject!int; 260 alias B = SubjectObject!int; 261 alias C = SubjectObject!int; 262 263 alias concatAsStrings = (a, b, c) => to!string(a) ~ to!string(b) ~ to!string(c); 264 265 alias Zip = ZipNObservable!(concatAsStrings, A, B, C); 266 267 auto a = new A; 268 auto b = new B; 269 auto c = new C; 270 auto zipped = new Zip(a, b, c); 271 272 string s; 273 auto observer = ((string text) { s = text; }).observerObject!string(); 274 auto disposable = zipped.subscribe(observer); 275 scope (exit) 276 disposable.dispose(); 277 278 .put(a, iota(3)); 279 .put(b, iota(3)); 280 281 assert(s == null); 282 .put(c, 0); 283 assert(s == "000"); 284 .put(c, 1); 285 assert(s == "111"); 286 .put(c, 2); 287 assert(s == "222"); 288 .put(c, 3); 289 assert(s == "222"); 290 .put(a, 3); 291 assert(s == "222"); 292 .put(b, 3); 293 assert(s == "333"); 294 } 295 296 /// 297 template zip(alias selector = tuple) 298 { 299 ZipNObservable!(selector, TObservables) zip(TObservables...)(TObservables observables) 300 { 301 return new typeof(return)(observables); 302 } 303 } 304 305 /// 306 unittest 307 { 308 // use simple 309 import rx; 310 311 auto s0 = new SubjectObject!int; 312 auto s1 = new SubjectObject!int; 313 314 auto zipped = zip(s0, s1); 315 316 int[] buf; 317 auto disposable = zipped.doSubscribe!(t => buf ~= (t[0] * t[1])); 318 scope (exit) 319 disposable.dispose(); 320 321 .put(s0, [0, 1, 2, 3]); 322 assert(buf.length == 0); 323 324 .put(s1, 0); 325 assert(buf == [0]); 326 .put(s1, 1); 327 assert(buf == [0, 1]); 328 .put(s1, 2); 329 assert(buf == [0, 1, 4]); 330 .put(s1, 3); 331 assert(buf == [0, 1, 4, 9]); 332 } 333 334 /// 335 unittest 336 { 337 // call completed 338 import rx; 339 import std.typecons; 340 341 auto s0 = new SubjectObject!int; 342 auto s1 = new SubjectObject!int; 343 auto s2 = new SubjectObject!int; 344 345 auto observer = new CounterObserver!(Tuple!(int, int, int)); 346 auto disposable = zip(s0, s1, s2).doSubscribe(observer); 347 scope (exit) 348 disposable.dispose(); 349 350 .put(s0, 100); 351 .put(s1, 10); 352 .put(s2, 1); 353 assert(observer.putCount == 1); 354 assert(observer.lastValue == tuple(100, 10, 1)); 355 356 s0.completed(); 357 assert(observer.completedCount == 0); 358 s1.completed(); 359 assert(observer.completedCount == 0); 360 s2.completed(); 361 assert(observer.completedCount == 1); 362 } 363 364 /// 365 unittest 366 { 367 // use selector 368 import rx; 369 370 auto s0 = new SubjectObject!int; 371 auto s1 = new SubjectObject!int; 372 373 int[] buf; 374 auto disposable = zip!((a, b) => a + b)(s0, s1).doSubscribe!(n => buf ~= n); 375 scope (exit) 376 disposable.dispose(); 377 378 .put(s0, 100); 379 .put(s0, 200); 380 .put(s1, 10); 381 .put(s1, 20); 382 383 assert(buf == [110, 220]); 384 } 385 386 unittest 387 { 388 // advanced 389 import rx; 390 import std.typecons : Tuple; 391 392 auto sub = new SubjectObject!int; 393 auto observer = new CounterObserver!(Tuple!(int, int)); 394 395 auto disposable = zip(sub, sub.drop(1)).doSubscribe(observer); 396 scope (exit) 397 disposable.dispose(); 398 399 .put(sub, 0); 400 .put(sub, 1); 401 assert(observer.putCount == 1); 402 assert(observer.lastValue == tuple(0, 1)); 403 .put(sub, 2); 404 assert(observer.putCount == 2); 405 assert(observer.lastValue == tuple(1, 2)); 406 } 407 408 unittest 409 { 410 // dispose all 411 import rx; 412 import std.typecons : Tuple; 413 414 alias InnerObserver = CounterObserver!(Tuple!(int, int)); 415 416 auto s0 = new TestingSubject!int; 417 auto s1 = new TestingSubject!int; 418 auto observer = new InnerObserver; 419 420 auto disposable = zip(s0, s1).subscribe(observer); 421 422 import std.conv; 423 424 assert(s0.observerCount == 1, "Error: " ~ to!string(s0.observerCount)); 425 assert(s1.observerCount == 1, "Error: " ~ to!string(s1.observerCount)); 426 disposable.dispose(); 427 assert(s0.observerCount == 0, "Error: " ~ to!string(s0.observerCount)); 428 assert(s1.observerCount == 0, "Error: " ~ to!string(s1.observerCount)); 429 } 430 431 unittest 432 { 433 // unsbscribe when completed 434 import rx; 435 import std.typecons : Tuple; 436 437 alias InnerObserver = CounterObserver!(Tuple!(int, int)); 438 439 auto s0 = new TestingSubject!int; 440 auto s1 = new TestingSubject!int; 441 auto observer = new InnerObserver; 442 443 auto disposable = zip(s0, s1).subscribe(observer); 444 scope (exit) 445 disposable.dispose(); 446 447 s0.completed(); 448 assert(s0.observerCount == 0); 449 assert(s1.observerCount == 1); 450 s1.completed(); 451 assert(s0.observerCount == 0); 452 assert(s1.observerCount == 0); 453 } 454 455 unittest 456 { 457 // if any subject failured then call observer.failure 458 import rx; 459 import std.typecons : Tuple; 460 461 auto s0 = new TestingSubject!int; 462 auto s1 = new TestingSubject!int; 463 auto observer = new CounterObserver!(Tuple!(int, int)); 464 auto disposable = zip(s0, s1).doSubscribe(observer); 465 scope (exit) 466 disposable.dispose(); 467 468 auto e = new Exception("TEST"); 469 s0.failure(e); 470 assert(observer.failureCount == 1); 471 assert(observer.lastException is e); 472 s1.failure(new Exception("test")); 473 assert(observer.failureCount == 1); 474 assert(observer.lastException is e); 475 }